Repository: incubator-systemml Updated Branches: refs/heads/master e81b9059e -> 19e46bb17
[SYSTEMML-1367] Performance parfor-dpesp row input handling Recently we added the ability to pass entire rows to parfor datapartition-execute without the need for grouping. This applies to rows smaller than the blocksize or input datasets with arbitrary large rows. This patch improves performance for these cases, by avoiding unnecessary copies of single fragment partitions to the target working partition. Furthermore, it reduces memory pressure as it reduces the working set size by #cores * partition size. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/19e46bb1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/19e46bb1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/19e46bb1 Branch: refs/heads/master Commit: 19e46bb171cf4b2abe05f841040de2f136b97b4f Parents: e81b905 Author: Matthias Boehm <[email protected]> Authored: Tue Mar 14 00:10:18 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Mar 14 00:11:25 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/parfor/RemoteDPParForSparkWorker.java | 9 +++++++++ 1 file changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/19e46bb1/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 637d11f..400fbd5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; @@ -198,6 +199,14 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList, MatrixBlock reuse ) throws IOException { + //fast path for partition of single fragment (see pseudo grouping), + //which avoids unnecessary copies and reduces memory pressure + if( valueList instanceof Collection && ((Collection<Writable>)valueList).size()==1 ) { + return ((PairWritableBlock)valueList.iterator().next()).block; + } + + //default: create or reuse target partition and copy individual partition fragments + //into this target, including nnz maintenance and potential dense-sparse format change MatrixBlock partition = reuse; try
