The place I saw the failure (now apparently fixed) was in a range of a vector 
that had size == 0 and the size was not checked. I assume the real problem 
occurred earlier. So the exception is always a bad vector range.

We may also have empty partitions, not sure. The way to reproduce the error is 
to have a bunch of small input files of one tuple per line and 0 or more lines 
per file. They are used to create a matrix of vectors with a .groupByKey().map. 
The key being an Int in this case. See the element reader, which reads a tuple 
per line 
https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala

The dimensionality is always correct by ncol and nrow AFAIK. I’ve checked in 
the recent case. And since this can happen for A’A only, there has been no 
dimensionality adjustment to allow empty rows. This means that there are 
guaranteed to be no empty blocks in mapBlock—for what it’s worth. I have not 
checked that every vector has the same cardinality but would be surprised given 
the code.

Skewed or incorrect partitioning, given that I don’t know what that means, may 
or may not happen. If it needs to be checked or adjusted it certainly could be 
done. It would probably be best to do it in some variant or method of 
CheckpointedDrm, right?

This recent case seemed to be fixed by drm.repartition(someSmallNumber) but I 
imagine this is accidental and not even a good hack. Given the way the data was 
read this is likely to reduce the number of partitions drastically.

There is an issue that seems to surface this problem the most often and that is 
  var columns = mc.textFile(source).map { line => line.split(delimiter) }
contains a source string, which is a very long list of files or directories, 
created from small Dstream batch rdds. When textFile looks at this it reads all 
hdfs file status objects to determine partitioning. This can take a long time 
so it may be better to block up the files and read them in .map( read some 
files ). This was the advice of a Spark committer. This seems to be a good case 
for sc.textFile() having its own optimization step, to handle part files 
written by Spark Streaming or large file written by some other mechanism.

Not sure if these issues are related but it is the thing in common between the 
last two occurrences of the vector range problem.

So it seems like we have two things to look at post 0.10.0
1) do we need an rdd validator or smart repartitioner and what exactly should 
it do.
2) what should we do to optimize reading many small files vs less large files. 
It seems like the two cases are ignored by Spark whether reading sequence files 
of text files.


On Apr 6, 2015, at 4:03 PM, Dmitriy Lyubimov <[email protected]> wrote:

PS  problems of rdds we are trying to feed to drmWrap() are mostly of 2
kinds:

(1) skewed/incorrect partitioning, e.g. due to prefiltering or degenerate
splitting (at least 1 row vector is required in every partition);

(2) invalid data dimensionality (although some operators may be forgiving
of that, in general, all vectors in a row-partitioned format must have the
same cardinality).

Either problem is responsibility of data loader (e.g. drmDfsRead()), not
algebra's. so no need to try to hack it in the optimizer's guts.


On Mon, Apr 6, 2015 at 3:56 PM, Dmitriy Lyubimov <[email protected]> wrote:

> I probably will try to answer these on the list. Slack is only on my phone
> now.
> 
> (1) par, generally, does not do shuffle. Internally its implementation
> largely relies on suffless coalesce() Spark api.
> 
> What it means, it will do great job reducing or increasing parallelism 5x
> or more without doing shuffle and observing approximate uniformity of
> splits.
> 
> (2) as a corollary, it means it will not eliminate problem of empty
> partitions.
> 
> (3) optimizer will not create problems in RDDs if initial rdd passed to
> drmWrap() did not have problems.
> 
> (4) optimizer does not validate rdds (in drmWrap() or elsewhere) for
> correctness for expense reasons.
> 
> However, we probably may want to create a routine that validates internal
> rdd structure (as a map-only or all-reduce op) which can be used by tools
> like data importers before passing data to algebra.
> 
> -d
> 

Reply via email to