Repository: systemml
Updated Branches:
  refs/heads/master 5ef4f5ac2 -> 094f5551d


[SYSTEMML-2016] Performance multi-threaded matrix multiply thresholds

Recently, we've introduced a common thread pool for all multi-threaded
operations, which eliminated the costs for thread pool creation per
operation. Therefore, multi-threading is now beneficial for smaller data
sizes. This patch accordingly updates the size and FLOP thresholds used
for matrix multiply (mm) and mmchain operations. In addition, we now
also use a slightly more efficient approach for aggregating partial
results for mmchain.

For example, on a scenario of 10,000 operations, this patch improved the
cumulative runtime for [1Kx1K] mm [1Kx1] from 8.1s to 1.7s, and for
[500x1K] mmchain [1Kx1] from 6.5s to 4s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/665d037e
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/665d037e
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/665d037e

Branch: refs/heads/master
Commit: 665d037e68d1ac1f359c64ee36d5395ad704a63e
Parents: 5ef4f5a
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 29 16:53:08 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 29 16:53:08 2018 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     |  2 +-
 .../runtime/matrix/data/LibMatrixMult.java      | 76 +++++++++++++-------
 2 files changed, 53 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/665d037e/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index a4e2e7a..7454f78 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -174,7 +174,7 @@ public abstract class SpoofRowwise extends SpoofOperator
        public MatrixBlock execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k)
        {
                //redirect to serial execution
-               if( k <= 1 || (_type.isColumnAgg() && 
!LibMatrixMult.checkParColumnAgg(inputs.get(0), k, false))
+               if( k <= 1 || (_type.isColumnAgg() && 
!LibMatrixMult.satisfiesMultiThreadingConstraints(inputs.get(0), k))
                        || getTotalInputSize(inputs) < PAR_NUMCELL_THRESHOLD ) {
                        return execute(inputs, scalarObjects, out);
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/665d037e/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 1eef244..a98cd79 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -36,6 +36,7 @@ import org.apache.sysml.lops.WeightedSigmoid.WSigmoidType;
 import org.apache.sysml.lops.WeightedSquaredLoss.WeightsType;
 import org.apache.sysml.lops.WeightedUnaryMM.WUMMType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.SwapIndex;
 import org.apache.sysml.runtime.functionobjects.ValueFunction;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
@@ -56,7 +57,8 @@ public class LibMatrixMult
        //internal configuration
        private static final boolean LOW_LEVEL_OPTIMIZATION = true;
        private static final long MEM_OVERHEAD_THRESHOLD = 2L*1024*1024; //MAX 
2 MB
-       private static final long PAR_MINFLOP_THRESHOLD = 2L*1024*1024; //MIN 2 
MFLOP
+       private static final long PAR_MINFLOP_THRESHOLD1 = 2L*1024*1024; //MIN 
2 MFLOP
+       private static final long PAR_MINFLOP_THRESHOLD2 = 128L*1024; //MIN 2 
MFLOP
        private static final int L2_CACHESIZE = 256 *1024; //256KB (common size)
        
        private LibMatrixMult() {
@@ -164,11 +166,8 @@ public class LibMatrixMult
                        return;
                }
                
-               //check too high additional vector-matrix memory requirements 
(fallback to sequential)
-               //check too small workload in terms of flops (fallback to 
sequential too)
-               if( m1.rlen == 1 && (8L * m2.clen * k > MEM_OVERHEAD_THRESHOLD 
|| !LOW_LEVEL_OPTIMIZATION || m2.clen==1 || m1.isUltraSparse() || 
m2.isUltraSparse()) 
-                       || 2L * m1.rlen * m1.clen * m2.clen < 
PAR_MINFLOP_THRESHOLD ) 
-               { 
+               //check too small workload and fallback to sequential if needed
+               if( !satisfiesMultiThreadingConstraints(m1, m2, m1.rlen==1, 
true, 2, k) ) {
                        matrixMult(m1, m2, ret);
                        return;
                }
@@ -286,9 +285,8 @@ public class LibMatrixMult
                        return;
                }
                
-               //check too high additional memory requirements (fallback to 
sequential)
-               //check too small workload in terms of flops (fallback to 
sequential too)
-               if( !checkParColumnAgg(mX, k, true) ) { 
+               //check temporary memory and too small workload for 
multi-threading
+               if( !satisfiesMultiThreadingConstraints(mX, true, true, 
mX.sparse?2:4, k) ) { 
                        matrixMultChain(mX, mV, mW, ret, ct);
                        return;
                }
@@ -307,12 +305,13 @@ public class LibMatrixMult
                        ArrayList<MatrixMultChainTask> tasks = new 
ArrayList<>();
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
                                tasks.add(new MatrixMultChainTask(mX, mV, mW, 
ct, lb, lb+blklens.get(i)));
-                       //execute tasks
                        List<Future<double[]>> taskret = pool.invokeAll(tasks);
                        pool.shutdown();
-                       //aggregate partial results
-                       for( Future<double[]> task : taskret )
-                               vectAdd(task.get(), ret.getDenseBlockValues(), 
0, 0, mX.clen);
+                       //aggregate partial results and error handling
+                       double[][] a = new double[taskret.size()][];
+                       for(int i=0; i<taskret.size(); i++)
+                               a[i] = taskret.get(i).get();
+                       vectAddAll(a, ret.getDenseBlockValues(), 0, 0, mX.clen);
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
@@ -360,12 +359,8 @@ public class LibMatrixMult
                        return;
                }
                
-               //check no parallelization benefit (fallback to sequential)
-               //check too small workload in terms of flops (fallback to 
sequential too)
-               if( ret.rlen == 1 || k <= 1
-                       || leftTranspose && 1L * m1.rlen * m1.clen * m1.clen < 
PAR_MINFLOP_THRESHOLD
-                       || !leftTranspose && 1L * m1.clen * m1.rlen * m1.rlen < 
PAR_MINFLOP_THRESHOLD) 
-               { 
+               //check too small workload and fallback to sequential if 
necessary
+               if( !satisfiesMultiThreadingConstraintsTSMM(m1, leftTranspose, 
1, k) ) {
                        matrixMultTransposeSelf(m1, ret, leftTranspose);
                        return;
                }
@@ -3354,6 +3349,16 @@ public class LibMatrixMult
                }
        }
        
+       private static void vectAddAll(double[][] a, double[] c, int ai, int 
ci, final int len) {
+               int bi = a.length % 4;
+               //process stride for remaining blocks
+               for(int i=0; i<bi; i++)
+                       vectAdd(a[i], c, ai, ci, len);
+               //process stride in 4 blocks at a time
+               for(int i=bi; i<a.length; i+=4)
+                       vectAdd4(a[i], a[i+1], a[i+2], a[i+3], c, ai, ci, len);
+       }
+       
        public static void vectAddInPlace(double aval, double[] c, final int 
ci, final int len) {
                final int bn = len%8;
                //rest, not aligned to 8-blocks
@@ -3609,11 +3614,6 @@ public class LibMatrixMult
                        && m2clen < 64 && (!inclCacheSize || 8*m2rlen*m2clen < 
L2_CACHESIZE);
        }
        
-       public static boolean checkParColumnAgg(MatrixBlock m1, int k, boolean 
inclFLOPs) {
-               return (8L * m1.clen * k <= MEM_OVERHEAD_THRESHOLD 
-                       && (!inclFLOPs || 4L * m1.rlen * m1.clen / 
(m1.sparse?2:1) >= PAR_MINFLOP_THRESHOLD));
-       }
-       
        private static boolean checkParMatrixMultRightInputRows( MatrixBlock 
m1, MatrixBlock m2, int k ) {
                //parallelize over rows in rhs matrix if number of rows in 
lhs/output is very small
                return (m1.rlen==1 && LOW_LEVEL_OPTIMIZATION && m2.clen>1 && 
!(m1.isUltraSparse()||m2.isUltraSparse()))
@@ -3629,6 +3629,34 @@ public class LibMatrixMult
                                && 8*m1.rlen*m1.clen < 256*1024 ); //lhs fits 
in L2 cache
        }
        
+       public static boolean satisfiesMultiThreadingConstraints(MatrixBlock 
m1, int k) {
+               return satisfiesMultiThreadingConstraints(m1, true, false, -1, 
k);
+       }
+       
+       public static boolean satisfiesMultiThreadingConstraints(MatrixBlock 
m1, boolean checkMem, boolean checkFLOPs, long FPfactor, int k) {
+               boolean sharedTP = 
(InfrastructureAnalyzer.getLocalParallelism() == k);
+               return k > 1 && LOW_LEVEL_OPTIMIZATION
+                       && (!checkMem || 8L * m1.clen * k < 
MEM_OVERHEAD_THRESHOLD)
+                       && (!checkFLOPs || FPfactor * m1.rlen * m1.clen >
+                       (sharedTP ? PAR_MINFLOP_THRESHOLD2 : 
PAR_MINFLOP_THRESHOLD1));
+       }
+       
+       public static boolean satisfiesMultiThreadingConstraints(MatrixBlock 
m1, MatrixBlock m2, boolean checkMem, boolean checkFLOPs, long FPfactor, int k) 
{
+               boolean sharedTP = 
(InfrastructureAnalyzer.getLocalParallelism() == k);
+               return k > 1 && LOW_LEVEL_OPTIMIZATION
+                       && (!checkMem || 8L * m2.clen * k < 
MEM_OVERHEAD_THRESHOLD)
+                       && (!checkFLOPs || FPfactor * m1.rlen * m1.clen * 
m2.clen >
+                       (sharedTP ? PAR_MINFLOP_THRESHOLD2 : 
PAR_MINFLOP_THRESHOLD1));
+       }
+       
+       private static boolean 
satisfiesMultiThreadingConstraintsTSMM(MatrixBlock m1, boolean leftTranspose, 
long FPfactor, int k) {
+               boolean sharedTP = 
(InfrastructureAnalyzer.getLocalParallelism() == k);
+               double threshold = sharedTP ? PAR_MINFLOP_THRESHOLD2 : 
PAR_MINFLOP_THRESHOLD1;
+               return k > 1 && LOW_LEVEL_OPTIMIZATION && 
(leftTranspose?m1.clen:m1.rlen)!=1
+                       && ((leftTranspose && FPfactor * m1.rlen * m1.clen * 
m1.clen > threshold)
+                       ||(!leftTranspose && FPfactor * m1.clen * m1.rlen * 
m1.rlen > threshold));
+       }
+       
        public static boolean isUltraSparseMatrixMult(MatrixBlock m1, 
MatrixBlock m2) {
                //note: ultra-sparse matrix mult implies also sparse outputs, 
hence we need
                //to be conservative an cannot use this for all ultra-sparse 
matrices.

Reply via email to