Repository: systemml Updated Branches: refs/heads/master 5ef4f5ac2 -> 094f5551d
[SYSTEMML-2016] Performance multi-threaded matrix multiply thresholds Recently, we've introduced a common thread pool for all multi-threaded operations, which eliminated the costs for thread pool creation per operation. Therefore, multi-threading is now beneficial for smaller data sizes. This patch accordingly updates the size and FLOP thresholds used for matrix multiply (mm) and mmchain operations. In addition, we now also use a slightly more efficient approach for aggregating partial results for mmchain. For example, on a scenario of 10,000 operations, this patch improved the cumulative runtime for [1Kx1K] mm [1Kx1] from 8.1s to 1.7s, and for [500x1K] mmchain [1Kx1] from 6.5s to 4s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/665d037e Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/665d037e Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/665d037e Branch: refs/heads/master Commit: 665d037e68d1ac1f359c64ee36d5395ad704a63e Parents: 5ef4f5a Author: Matthias Boehm <[email protected]> Authored: Thu Mar 29 16:53:08 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Mar 29 16:53:08 2018 -0700 ---------------------------------------------------------------------- .../sysml/runtime/codegen/SpoofRowwise.java | 2 +- .../runtime/matrix/data/LibMatrixMult.java | 76 +++++++++++++------- 2 files changed, 53 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/665d037e/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java index a4e2e7a..7454f78 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java @@ -174,7 +174,7 @@ public abstract class SpoofRowwise extends SpoofOperator public MatrixBlock execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) { //redirect to serial execution - if( k <= 1 || (_type.isColumnAgg() && !LibMatrixMult.checkParColumnAgg(inputs.get(0), k, false)) + if( k <= 1 || (_type.isColumnAgg() && !LibMatrixMult.satisfiesMultiThreadingConstraints(inputs.get(0), k)) || getTotalInputSize(inputs) < PAR_NUMCELL_THRESHOLD ) { return execute(inputs, scalarObjects, out); } http://git-wip-us.apache.org/repos/asf/systemml/blob/665d037e/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java index 1eef244..a98cd79 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java @@ -36,6 +36,7 @@ import org.apache.sysml.lops.WeightedSigmoid.WSigmoidType; import org.apache.sysml.lops.WeightedSquaredLoss.WeightsType; import org.apache.sysml.lops.WeightedUnaryMM.WUMMType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.functionobjects.SwapIndex; import org.apache.sysml.runtime.functionobjects.ValueFunction; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; @@ -56,7 +57,8 @@ public class LibMatrixMult //internal configuration private static final boolean LOW_LEVEL_OPTIMIZATION = true; private static final long MEM_OVERHEAD_THRESHOLD = 2L*1024*1024; //MAX 2 MB - private static final long PAR_MINFLOP_THRESHOLD = 2L*1024*1024; //MIN 2 MFLOP + private static final long PAR_MINFLOP_THRESHOLD1 = 2L*1024*1024; //MIN 2 MFLOP + private static final long PAR_MINFLOP_THRESHOLD2 = 128L*1024; //MIN 2 MFLOP private static final int L2_CACHESIZE = 256 *1024; //256KB (common size) private LibMatrixMult() { @@ -164,11 +166,8 @@ public class LibMatrixMult return; } - //check too high additional vector-matrix memory requirements (fallback to sequential) - //check too small workload in terms of flops (fallback to sequential too) - if( m1.rlen == 1 && (8L * m2.clen * k > MEM_OVERHEAD_THRESHOLD || !LOW_LEVEL_OPTIMIZATION || m2.clen==1 || m1.isUltraSparse() || m2.isUltraSparse()) - || 2L * m1.rlen * m1.clen * m2.clen < PAR_MINFLOP_THRESHOLD ) - { + //check too small workload and fallback to sequential if needed + if( !satisfiesMultiThreadingConstraints(m1, m2, m1.rlen==1, true, 2, k) ) { matrixMult(m1, m2, ret); return; } @@ -286,9 +285,8 @@ public class LibMatrixMult return; } - //check too high additional memory requirements (fallback to sequential) - //check too small workload in terms of flops (fallback to sequential too) - if( !checkParColumnAgg(mX, k, true) ) { + //check temporary memory and too small workload for multi-threading + if( !satisfiesMultiThreadingConstraints(mX, true, true, mX.sparse?2:4, k) ) { matrixMultChain(mX, mV, mW, ret, ct); return; } @@ -307,12 +305,13 @@ public class LibMatrixMult ArrayList<MatrixMultChainTask> tasks = new ArrayList<>(); for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) tasks.add(new MatrixMultChainTask(mX, mV, mW, ct, lb, lb+blklens.get(i))); - //execute tasks List<Future<double[]>> taskret = pool.invokeAll(tasks); pool.shutdown(); - //aggregate partial results - for( Future<double[]> task : taskret ) - vectAdd(task.get(), ret.getDenseBlockValues(), 0, 0, mX.clen); + //aggregate partial results and error handling + double[][] a = new double[taskret.size()][]; + for(int i=0; i<taskret.size(); i++) + a[i] = taskret.get(i).get(); + vectAddAll(a, ret.getDenseBlockValues(), 0, 0, mX.clen); } catch(Exception ex) { throw new DMLRuntimeException(ex); @@ -360,12 +359,8 @@ public class LibMatrixMult return; } - //check no parallelization benefit (fallback to sequential) - //check too small workload in terms of flops (fallback to sequential too) - if( ret.rlen == 1 || k <= 1 - || leftTranspose && 1L * m1.rlen * m1.clen * m1.clen < PAR_MINFLOP_THRESHOLD - || !leftTranspose && 1L * m1.clen * m1.rlen * m1.rlen < PAR_MINFLOP_THRESHOLD) - { + //check too small workload and fallback to sequential if necessary + if( !satisfiesMultiThreadingConstraintsTSMM(m1, leftTranspose, 1, k) ) { matrixMultTransposeSelf(m1, ret, leftTranspose); return; } @@ -3354,6 +3349,16 @@ public class LibMatrixMult } } + private static void vectAddAll(double[][] a, double[] c, int ai, int ci, final int len) { + int bi = a.length % 4; + //process stride for remaining blocks + for(int i=0; i<bi; i++) + vectAdd(a[i], c, ai, ci, len); + //process stride in 4 blocks at a time + for(int i=bi; i<a.length; i+=4) + vectAdd4(a[i], a[i+1], a[i+2], a[i+3], c, ai, ci, len); + } + public static void vectAddInPlace(double aval, double[] c, final int ci, final int len) { final int bn = len%8; //rest, not aligned to 8-blocks @@ -3609,11 +3614,6 @@ public class LibMatrixMult && m2clen < 64 && (!inclCacheSize || 8*m2rlen*m2clen < L2_CACHESIZE); } - public static boolean checkParColumnAgg(MatrixBlock m1, int k, boolean inclFLOPs) { - return (8L * m1.clen * k <= MEM_OVERHEAD_THRESHOLD - && (!inclFLOPs || 4L * m1.rlen * m1.clen / (m1.sparse?2:1) >= PAR_MINFLOP_THRESHOLD)); - } - private static boolean checkParMatrixMultRightInputRows( MatrixBlock m1, MatrixBlock m2, int k ) { //parallelize over rows in rhs matrix if number of rows in lhs/output is very small return (m1.rlen==1 && LOW_LEVEL_OPTIMIZATION && m2.clen>1 && !(m1.isUltraSparse()||m2.isUltraSparse())) @@ -3629,6 +3629,34 @@ public class LibMatrixMult && 8*m1.rlen*m1.clen < 256*1024 ); //lhs fits in L2 cache } + public static boolean satisfiesMultiThreadingConstraints(MatrixBlock m1, int k) { + return satisfiesMultiThreadingConstraints(m1, true, false, -1, k); + } + + public static boolean satisfiesMultiThreadingConstraints(MatrixBlock m1, boolean checkMem, boolean checkFLOPs, long FPfactor, int k) { + boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); + return k > 1 && LOW_LEVEL_OPTIMIZATION + && (!checkMem || 8L * m1.clen * k < MEM_OVERHEAD_THRESHOLD) + && (!checkFLOPs || FPfactor * m1.rlen * m1.clen > + (sharedTP ? PAR_MINFLOP_THRESHOLD2 : PAR_MINFLOP_THRESHOLD1)); + } + + public static boolean satisfiesMultiThreadingConstraints(MatrixBlock m1, MatrixBlock m2, boolean checkMem, boolean checkFLOPs, long FPfactor, int k) { + boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); + return k > 1 && LOW_LEVEL_OPTIMIZATION + && (!checkMem || 8L * m2.clen * k < MEM_OVERHEAD_THRESHOLD) + && (!checkFLOPs || FPfactor * m1.rlen * m1.clen * m2.clen > + (sharedTP ? PAR_MINFLOP_THRESHOLD2 : PAR_MINFLOP_THRESHOLD1)); + } + + private static boolean satisfiesMultiThreadingConstraintsTSMM(MatrixBlock m1, boolean leftTranspose, long FPfactor, int k) { + boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); + double threshold = sharedTP ? PAR_MINFLOP_THRESHOLD2 : PAR_MINFLOP_THRESHOLD1; + return k > 1 && LOW_LEVEL_OPTIMIZATION && (leftTranspose?m1.clen:m1.rlen)!=1 + && ((leftTranspose && FPfactor * m1.rlen * m1.clen * m1.clen > threshold) + ||(!leftTranspose && FPfactor * m1.clen * m1.rlen * m1.rlen > threshold)); + } + public static boolean isUltraSparseMatrixMult(MatrixBlock m1, MatrixBlock m2) { //note: ultra-sparse matrix mult implies also sparse outputs, hence we need //to be conservative an cannot use this for all ultra-sparse matrices.
