[
https://issues.apache.org/jira/browse/SYSTEMML-951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15606177#comment-15606177
]
Mike Dusenberry commented on SYSTEMML-951:
------------------------------------------
As a followup, in general, this partition pruning right-indexing is really
nice! In my particular situation, I am using {{MLContext}} to first convert
{{DataFrames}} to SystemML {{Matrix}} objects with a simple script. As long as
there is no operation done, such as {{sum}}, then this transformation is fairly
lazy aside from about 5 mins on 1.5 TB due to a {{count}} call in
[{{RDDConverterUtils::290}} |
https://github.com/apache/incubator-systemml/blob/711db680c65215a22f97239909eda811ddfb5a1d/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java#L290-L290].
(1) We should investigate if that {{count}} call is necessary (it is
computing nnz on the incoming DataFrame).
Once a real operation is done for the first time on the matrix in DML, it will
invoke a full shuffle -- this can take a bit of time -- maybe 10-15 mins.
However, once this is done once, subsequent steps just read from the shuffle.
Then, the actual conversion of each partition will occur. However, the great
thing about the partition pruning is that if a single row of blocks is
accessed, only those partitions will undergo the conversion overhead at that
moment. So, this allows the cost of conversion to be spread across the first
full loop through the dataset, rather than being concentrated entirely at the
beginning. This is great as it allows for quicker iteration after seeing the
results for a few mini-batches. Additionally, if one is working in a notebook
or REPL, a job can be stopped early for script adjustments (without stopping
the overall Spark session), and the underlying RDDs will maintain their caches,
allowing for quick interactive work. Alternatively, a full shuffle and
conversion can take a few hours up front for this size dataset.
Based on this, I've also been experimenting with removing the out-of-core
requirement, and just simply applying this partition pruning to all dataset.
For the above reasons, this has shown great results. Additionally, it is often
the case that a given matrix may be able to fit in memory by itself, but in
combination with others, it may not fit. In that case, our current rules would
use a filter rather than the partition pruning, even though the matrix actually
may be partially on disk. The partition pruning is quite useful for this
situation as well.
> Efficient spark right indexing via lookup
> -----------------------------------------
>
> Key: SYSTEMML-951
> URL: https://issues.apache.org/jira/browse/SYSTEMML-951
> Project: SystemML
> Issue Type: Task
> Components: Runtime
> Reporter: Matthias Boehm
> Assignee: Matthias Boehm
> Attachments: mnist_softmax_v1.dml, mnist_softmax_v2.dml
>
>
> So far all versions of spark right indexing instructions require a full scan
> over the data set. In case of existing partitioning (which anyway happens for
> any external format - binary block conversion) such a full scan is
> unnecessary if we're only interested in a small subset of the data. This task
> adds an efficient right indexing operation via 'rdd lookups' which access at
> most <num_lookup> partitions given existing hash partitioning.
> cc [[email protected]]
> In detail, this task covers the following improvements for spark matrix right
> indexing. Frames are not covered here because they allow variable-length
> blocks. Also, note that it is important to differentiate between in-core and
> out-of-core matrices: for in-core matrices (i.e., matrices that fit in
> deserialized form into aggregated memory), the full scan is actually not
> problematic as the filter operation only scans keys without touching the
> actual values.
> (1) Scan-based indexing w/o aggregation: So far, we apply aggregations to
> merge partial blocks very conservatively. However, if the indexing range is
> block aligned (e.g., dimension start at block boundary or range within single
> block) this is unnecessary. This alone led to a 2x improvement for indexing
> row batches out of an in-core matrix.
> (2) Single-block lookup: If the indexing range covers a subrange of a single
> block, we directly perform a lookup. On in-core matrices this gives a minor
> improvement (but does not hurt) while on out-of-core matrices, the
> improvement is huge in case of existing partitioner as we only have to scan a
> single partition instead of the entire data.
> (3) Multi-block lookups: Unfortunately, Spark does not provide a lookup for a
> list of keys. So the next best option is a data-query join (in case of
> existing partitioner) with {{data.join(filter).map()}}, which works very well
> for in-core data sets, but for out-of-core datasets, unfortunately, does not
> exploit the potential for partition pruning and thus reads the entire data. I
> also experimented with a custom multi-block lookup that runs multiple lookups
> in a multi-threaded fashion - this gave the expected pruning but was very
> ugly due to an unbounded number of jobs.
> In conclusion, I'll create a patch for scenarios (1) and (2), while scenario
> (3) requires some more thoughts and is postponed after the 0.11 release. One
> idea would be to create a custom RDD that implements {{lookup(List<T> keys)}}
> by constructing a pruned set of input partitions via
> {{partitioner.getPartition(key)}}. cc [~freiss] [~niketanpansare] [~reinwald]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)