[SYSTEMML-2250] Multi-threaded parfor result merge (per block)

This patch leverages the common thread pool to merge parfor worker
results in a multi-threaded manner into dense result matrices in the
context of local in-memory result merge. This reduces the serial
fraction of parfor programs which is especially important for scale-up
machines with large degree of parallelism.
 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/dfc48ae3
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/dfc48ae3
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/dfc48ae3

Branch: refs/heads/master
Commit: dfc48ae3cca5725a547b724ee865a186650528c4
Parents: fdc5511
Author: Matthias Boehm <[email protected]>
Authored: Tue Apr 17 22:58:01 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Apr 17 22:58:01 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/parfor/ResultMerge.java      | 13 ++---
 .../parfor/ResultMergeLocalMemory.java          |  2 +-
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 60 ++++++++++++++------
 3 files changed, 49 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
index be87b17..7b8a931 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMerge.java
@@ -83,19 +83,16 @@ public abstract class ResultMerge implements Serializable
         */
        public abstract MatrixObject executeParallelMerge( int par );
        
-       /**
-        * ?
-        * 
-        * @param out initially empty block
-        * @param in input matrix block
-        * @param appendOnly ?
-        */
        protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly ) {
+               mergeWithoutComp(out, in, appendOnly, false);
+       }
+       
+       protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly, boolean par ) {
                //pass through to matrix block operations
                if( _isAccum )
                        out.binaryOperationsInPlace(PLUS, in);
                else
-                       out.merge(in, appendOnly);
+                       out.merge(in, appendOnly, par);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index e3a2e82..c76b3f9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -262,7 +262,7 @@ public class ResultMergeLocalMemory extends ResultMerge
         */
        private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly 
) {
                if( _compare == null )
-                       mergeWithoutComp(out, in, appendOnly);
+                       mergeWithoutComp(out, in, appendOnly, true);
                else
                        mergeWithComp(out, in, _compare);
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/dfc48ae3/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 7fab225..5ce4963 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.stream.IntStream;
 
 import org.apache.commons.math3.random.Well1024a;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -1578,6 +1579,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @param appendOnly ?
         */
        public void merge(MatrixBlock that, boolean appendOnly) {
+               merge(that, appendOnly, false);
+       }
+       
+       public void merge(MatrixBlock that, boolean appendOnly, boolean par) {
                //check for empty input source (nothing to merge)
                if( that == null || that.isEmptyBlock(false) )
                        return;
@@ -1599,6 +1604,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                long nnz = nonZeros + that.nonZeros;
                if( sparse )
                        mergeIntoSparse(that, appendOnly);
+               else if( par )
+                       mergeIntoDensePar(that);
                else
                        mergeIntoDense(that);
                
@@ -1608,35 +1615,54 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
 
        private void mergeIntoDense(MatrixBlock that)
        {
-               if( that.sparse ) //DENSE <- SPARSE
-               {
+               if( that.sparse ) { //DENSE <- SPARSE
                        double[] a = getDenseBlockValues();
                        SparseBlock b = that.sparseBlock;
                        int m = rlen;
                        int n = clen;
-                       
-                       for( int i=0, aix=0; i<m; i++, aix+=n )
-                               if( !b.isEmpty(i) )
-                               {
-                                       int bpos = b.pos(i);
-                                       int blen = b.size(i);
-                                       int[] bix = b.indexes(i);
-                                       double[] bval = b.values(i);
-                                       for( int j=bpos; j<bpos+blen; j++ )
-                                               if( bval[j] != 0 )
-                                                       a[ aix + bix[j] ] = 
bval[j];
-                               }
+                       for( int i=0, aix=0; i<m; i++, aix+=n ) {
+                               if( b.isEmpty(i) ) continue;
+                               int bpos = b.pos(i);
+                               int blen = b.size(i);
+                               int[] bix = b.indexes(i);
+                               double[] bval = b.values(i);
+                               for( int j=bpos; j<bpos+blen; j++ )
+                                       if( bval[j] != 0 )
+                                               a[ aix + bix[j] ] = bval[j];
+                       }
                }
-               else //DENSE <- DENSE
-               {
+               else { //DENSE <- DENSE
                        double[] a = getDenseBlockValues();
                        double[] b = that.getDenseBlockValues();
                        int len = rlen * clen;
-                       
                        for( int i=0; i<len; i++ )
                                a[i] = ( b[i] != 0 ) ? b[i] : a[i];
                }
        }
+       
+       private void mergeIntoDensePar(MatrixBlock that)
+       {
+               if( that.sparse ) { //DENSE <- SPARSE
+                       double[] a = getDenseBlockValues();
+                       SparseBlock b = that.sparseBlock;
+                       IntStream.range(0, rlen).parallel().forEach(i -> {
+                               if( b.isEmpty(i) ) return;
+                               int aix = i*clen;
+                               int bpos = b.pos(i);
+                               int blen = b.size(i);
+                               int[] bix = b.indexes(i);
+                               double[] bval = b.values(i);
+                               for( int j=bpos; j<bpos+blen; j++ )
+                                       if( bval[j] != 0 )
+                                               a[ aix + bix[j] ] = bval[j];
+                       });
+               }
+               else { //DENSE <- DENSE
+                       double[] a = getDenseBlockValues();
+                       double[] b = that.getDenseBlockValues();
+                       Arrays.parallelSetAll(a, i -> (b[i]!=0) ? b[i] : a[i]);
+               }
+       }
 
        private void mergeIntoSparse(MatrixBlock that, boolean appendOnly)
        {

Reply via email to