[ 
https://issues.apache.org/jira/browse/SYSTEMML-951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563583#comment-15563583
 ] 

Mike Dusenberry edited comment on SYSTEMML-951 at 10/10/16 9:32 PM:
--------------------------------------------------------------------

[~mboehm7]  Excellent!  I gave the new patch a try, and here are a few thoughts:

1.  On trying it, I ran into an {{NoSuchElementException}} exception with the 
{{rdd.partitioner().get()}} call in {{isMultiBlockLookup}} due to the 
partitioner being "None".  I think we should guard against this with an 
{{isPresent}} call.
2.  The above issue brings up a bigger issue of why I was running into the 
situation of the internal RDD not having a partitioner.  I was using the same 
example of passing a {{DataFrame}} into {{MLContext}}.  Looking into it, 
{{Script.in(...)}} calls {{MLContextUtil.convertInputType(...)}}, which calls 
[{{MLContextConversionUtil.dataFrameToMatrixObject(...)}} | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L320-L320].
  This method then converts the {{DataFrame}} to a binary blocked matrix RDD 
{{JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock}}).  That conversion 
eventually calls {{combineByKey(...)}}, so the {{binaryBlock}} RDD is correctly 
hash partitioned with a {{HashPartitioner}}.  [Line 325 | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L325]
 then converts this {{binaryBlock}} RDD to a {{MatrixObject}} via 
{{binaryBlocksToMatrixObject(...)}}.  This method does a {{mapToPair(...)}} on 
[line 244 | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L244]
 to do a deep copy of the binary blocked RDD.  Since map statements do not 
contractually preserve partitioning, the partitioner is set to "None".  This 
RDD is what is used internally, and thus it doesn't have a partitioner.  So the 
question: do we need to do this {{mapToPair(...)}} call?   Continuing with that 
line removed, we still have an issue -- by the time the {{isMultiBlockLookup}} 
method is called, we have called {{coalesce}} at 
[{{CheckpointSPInstruction:112}} | 
https://github.com/apache/incubator-systemml/blob/7c7d20b82c8b7edf9551e154c8f8cd15dc3ca65f/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java#L112-L112]
 in my particular example, and thus the partitioning is lost again.  In this 
case, my initial DataFrame has 20,000 partitions due to earlier preprocessing 
during which I found that number of partitions to be stable (i.e. not hitting 
2GB partition limits on regular Spark jobs, avoid general Spark OOM, etc.).

Overall, excited about this patch, and looks like we just need to explore a few 
more things.


was (Author: [email protected]):
[~mboehm7]  Excellent!  I gave the new patch a try, and here are a few thoughts:

1.  On trying it, I ran into an {{NoSuchElementException}} exception with the 
{{rdd.partitioner().get()}} call in {{isMultiBlockLookup}} due to the 
partitioner being "None".  I think we should guard against this with an 
{{isPresent}} call.
2.  The above issue brings up a bigger issue of why I was running into the 
situation of the internal RDD not having a partitioner.  I was using the same 
example of passing a {{DataFrame}} into {{MLContext}}.  Looking into it, 
{{Script.in(...)}} calls {{MLContextUtil.convertInputType(...)}}, which calls 
[{{MLContextConversionUtil.dataFrameToMatrixObject(...)}} | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L320-L320].
  This method then converts the {{DataFrame}} to a binary blocked matrix RDD 
{{JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock}}).  That conversion 
eventually calls {{combineByKey(...)}}, so the {{binaryBlock}} RDD is correctly 
hash partitioned with a {{HashPartitioner}}.  [Line 325 | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L325]
 then converts this {{binaryBlock}} RDD to a {{MatrixObject}} via 
{{binaryBlocksToMatrixObject(...)}}.  This method does a {{mapToPair(...)}} on 
[line 244 | 
https://github.com/apache/incubator-systemml/blob/d9de9c7927915e411692df79c90afd0eaa6323e0/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java#L244]
 to do a deep copy of the binary blocked RDD.  Since map statements do not 
contractually preserve partitioning, the partitioner is set to "None".  This 
RDD is what is used internally, and thus it doesn't have a partitioner.  So the 
question: do we need to do this {{mapToPair(...)}} call?   Continuing with that 
line removed, we still have the issue in my case -- by the time the 
{{isMultiBlockLookup}} method is called, we have called {{coalesce}} at 
[{{CheckpointSPInstruction:112}} | 
https://github.com/apache/incubator-systemml/blob/7c7d20b82c8b7edf9551e154c8f8cd15dc3ca65f/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java#L112-L112],
 and thus the partitioning is lost again.  In this case, my initial DataFrame 
has 20,000 partitions due to earlier preprocessing during which I found that 
number of partitions to be stable (i.e. not hitting 2GB partition limits on 
regular Spark jobs, avoid general Spark OOM, etc.).

Overall, excited about this patch, and looks like we just need to explore a few 
more things.

> Efficient spark right indexing via lookup
> -----------------------------------------
>
>                 Key: SYSTEMML-951
>                 URL: https://issues.apache.org/jira/browse/SYSTEMML-951
>             Project: SystemML
>          Issue Type: Task
>          Components: Runtime
>            Reporter: Matthias Boehm
>            Assignee: Matthias Boehm
>         Attachments: mnist_softmax_v1.dml, mnist_softmax_v2.dml
>
>
> So far all versions of spark right indexing instructions require a full scan 
> over the data set. In case of existing partitioning (which anyway happens for 
> any external format - binary block conversion) such a full scan is 
> unnecessary if we're only interested in a small subset of the data. This task 
> adds an efficient right indexing operation via 'rdd lookups' which access at 
> most <num_lookup> partitions given existing hash partitioning. 
> cc [[email protected]]  
> In detail, this task covers the following improvements for spark matrix right 
> indexing. Frames are not covered here because they allow variable-length 
> blocks. Also, note that it is important to differentiate between in-core and 
> out-of-core matrices: for in-core matrices (i.e., matrices that fit in 
> deserialized form into aggregated memory), the full scan is actually not 
> problematic as the filter operation only scans keys without touching the 
> actual values.
> (1) Scan-based indexing w/o aggregation: So far, we apply aggregations to 
> merge partial blocks very conservatively. However, if the indexing range is 
> block aligned (e.g., dimension start at block boundary or range within single 
> block) this is unnecessary. This alone led to a 2x improvement for indexing 
> row batches out of an in-core matrix.
> (2) Single-block lookup: If the indexing range covers a subrange of a single 
> block, we directly perform a lookup. On in-core matrices this gives a minor 
> improvement (but does not hurt) while on out-of-core matrices, the 
> improvement is huge in case of existing partitioner as we only have to scan a 
> single partition instead of the entire data.
> (3) Multi-block lookups: Unfortunately, Spark does not provide a lookup for a 
> list of keys. So the next best option is a data-query join (in case of 
> existing partitioner) with {{data.join(filter).map()}}, which works very well 
> for in-core data sets, but for out-of-core datasets, unfortunately, does not 
> exploit the potential for partition pruning and thus reads the entire data. I 
> also experimented with a custom multi-block lookup that runs multiple lookups 
> in a multi-threaded fashion - this gave the expected pruning but was very 
> ugly due to an unbounded number of jobs. 
> In conclusion, I'll create a patch for scenarios (1) and (2), while scenario 
> (3) requires some more thoughts and is postponed after the 0.11 release. One 
> idea would be to create a custom RDD that implements {{lookup(List<T> keys)}} 
> by constructing a pruned set of input partitions via 
> {{partitioner.getPartition(key)}}. cc [~freiss] [~niketanpansare] [~reinwald]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to