[SYSTEMML-2250] Multi-threaded parfor result merge (per block) This patch leverages the common thread pool to merge parfor worker results in a multi-threaded manner into dense result matrices in the context of local in-memory result merge. This reduces the serial fraction of parfor programs which is especially important for scale-up machines with large degree of parallelism.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/dfc48ae3 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/dfc48ae3 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/dfc48ae3 Branch: refs/heads/master Commit: dfc48ae3cca5725a547b724ee865a186650528c4 Parents: fdc5511 Author: Matthias Boehm <[email protected]> Authored: Tue Apr 17 22:58:01 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Apr 17 22:58:01 2018 -0700 ---------------------------------------------------------------------- .../controlprogram/parfor/ResultMerge.java | 13 ++--- .../parfor/ResultMergeLocalMemory.java | 2 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 60 ++++++++++++++------ 3 files changed, 49 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java index be87b17..7b8a931 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java @@ -83,19 +83,16 @@ public abstract class ResultMerge implements Serializable */ public abstract MatrixObject executeParallelMerge( int par ); - /** - * ? - * - * @param out initially empty block - * @param in input matrix block - * @param appendOnly ? - */ protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { + mergeWithoutComp(out, in, appendOnly, false); + } + + protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) { //pass through to matrix block operations if( _isAccum ) out.binaryOperationsInPlace(PLUS, in); else - out.merge(in, appendOnly); + out.merge(in, appendOnly, par); } /** http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index e3a2e82..c76b3f9 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -262,7 +262,7 @@ public class ResultMergeLocalMemory extends ResultMerge */ private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly ) { if( _compare == null ) - mergeWithoutComp(out, in, appendOnly); + mergeWithoutComp(out, in, appendOnly, true); else mergeWithComp(out, in, _compare); } http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 7fab225..5ce4963 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.stream.IntStream; import org.apache.commons.math3.random.Well1024a; import org.apache.hadoop.io.DataInputBuffer; @@ -1578,6 +1579,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @param appendOnly ? */ public void merge(MatrixBlock that, boolean appendOnly) { + merge(that, appendOnly, false); + } + + public void merge(MatrixBlock that, boolean appendOnly, boolean par) { //check for empty input source (nothing to merge) if( that == null || that.isEmptyBlock(false) ) return; @@ -1599,6 +1604,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab long nnz = nonZeros + that.nonZeros; if( sparse ) mergeIntoSparse(that, appendOnly); + else if( par ) + mergeIntoDensePar(that); else mergeIntoDense(that); @@ -1608,35 +1615,54 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab private void mergeIntoDense(MatrixBlock that) { - if( that.sparse ) //DENSE <- SPARSE - { + if( that.sparse ) { //DENSE <- SPARSE double[] a = getDenseBlockValues(); SparseBlock b = that.sparseBlock; int m = rlen; int n = clen; - - for( int i=0, aix=0; i<m; i++, aix+=n ) - if( !b.isEmpty(i) ) - { - int bpos = b.pos(i); - int blen = b.size(i); - int[] bix = b.indexes(i); - double[] bval = b.values(i); - for( int j=bpos; j<bpos+blen; j++ ) - if( bval[j] != 0 ) - a[ aix + bix[j] ] = bval[j]; - } + for( int i=0, aix=0; i<m; i++, aix+=n ) { + if( b.isEmpty(i) ) continue; + int bpos = b.pos(i); + int blen = b.size(i); + int[] bix = b.indexes(i); + double[] bval = b.values(i); + for( int j=bpos; j<bpos+blen; j++ ) + if( bval[j] != 0 ) + a[ aix + bix[j] ] = bval[j]; + } } - else //DENSE <- DENSE - { + else { //DENSE <- DENSE double[] a = getDenseBlockValues(); double[] b = that.getDenseBlockValues(); int len = rlen * clen; - for( int i=0; i<len; i++ ) a[i] = ( b[i] != 0 ) ? b[i] : a[i]; } } + + private void mergeIntoDensePar(MatrixBlock that) + { + if( that.sparse ) { //DENSE <- SPARSE + double[] a = getDenseBlockValues(); + SparseBlock b = that.sparseBlock; + IntStream.range(0, rlen).parallel().forEach(i -> { + if( b.isEmpty(i) ) return; + int aix = i*clen; + int bpos = b.pos(i); + int blen = b.size(i); + int[] bix = b.indexes(i); + double[] bval = b.values(i); + for( int j=bpos; j<bpos+blen; j++ ) + if( bval[j] != 0 ) + a[ aix + bix[j] ] = bval[j]; + }); + } + else { //DENSE <- DENSE + double[] a = getDenseBlockValues(); + double[] b = that.getDenseBlockValues(); + Arrays.parallelSetAll(a, i -> (b[i]!=0) ? b[i] : a[i]); + } + } private void mergeIntoSparse(MatrixBlock that, boolean appendOnly) {
