Repository: systemml Updated Branches: refs/heads/master 88a52eb04 -> 53b489ced
[SYSTEMML-2212] Performance multi-threaded unary aggregates This patch makes two performance improvements for unary aggregates such as sum(X), rowSums(X), and colSums(X). First, we now use data-size thresholds for multi-threading that are aware of the shared thread pool (introduced in SYSTEMML-2193) which makes multi-threading beneficial for much smaller data sizes. Second, we use the task partitioning primitives from matrix multiplication for better load balance. On a scenario of 10,000 sum(X) operations, the cumulative execution time improved as follow for different data sizes: 1) 1K x 1K (8MB): 59.7s -> 4.4s 2) 2K x 1K (16MB): 5.8s -> 5.8s 3) 10K x 1K (80MB): 17.7s -> 12.4s Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8693ae65 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8693ae65 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8693ae65 Branch: refs/heads/master Commit: 8693ae65169046cf545c1b93f75b7f4cc0ce6541 Parents: 88a52eb Author: Matthias Boehm <[email protected]> Authored: Sun Mar 25 21:19:47 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sun Mar 25 21:19:47 2018 -0700 ---------------------------------------------------------------------- .../sysml/runtime/codegen/SpoofRowwise.java | 5 +-- .../sysml/runtime/matrix/data/LibMatrixAgg.java | 38 +++++++++++--------- .../runtime/matrix/data/LibMatrixMult.java | 30 ++-------------- .../sysml/runtime/util/UtilFunctions.java | 26 ++++++++++++++ 4 files changed, 53 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java index 6e5646d..a4e2e7a 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java @@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.data.SparseRow; import org.apache.sysml.runtime.matrix.data.SparseRowVector; import org.apache.sysml.runtime.util.CommonThreadPool; +import org.apache.sysml.runtime.util.UtilFunctions; public abstract class SpoofRowwise extends SpoofOperator @@ -200,8 +201,8 @@ public abstract class SpoofRowwise extends SpoofOperator //core parallel execute ExecutorService pool = CommonThreadPool.get(k); ArrayList<Integer> blklens = (a instanceof CompressedMatrixBlock) ? - LibMatrixMult.getAlignedBlockSizes(m, k, BitmapEncoder.BITMAP_BLOCK_SZ) : - LibMatrixMult.getBalancedBlockSizesDefault(m, k, (long)m*n<16*PAR_NUMCELL_THRESHOLD); + UtilFunctions.getAlignedBlockSizes(m, k, BitmapEncoder.BITMAP_BLOCK_SZ) : + UtilFunctions.getBalancedBlockSizesDefault(m, k, (long)m*n<16*PAR_NUMCELL_THRESHOLD); try { http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index f99accb..2d81255 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.functionobjects.Builtin; import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysml.runtime.functionobjects.CM; @@ -81,7 +82,8 @@ public class LibMatrixAgg { //internal configuration parameters private static final boolean NAN_AWARENESS = false; - private static final long PAR_NUMCELL_THRESHOLD = 1024*1024; //Min 1M elements + private static final long PAR_NUMCELL_THRESHOLD1 = 1024*1024; //Min 1M elements + private static final long PAR_NUMCELL_THRESHOLD2 = 16*1024; //Min 16K elements private static final long PAR_INTERMEDIATE_SIZE_THRESHOLD = 2*1024*1024; //Max 2MB //////////////////////////////// @@ -209,9 +211,7 @@ public class LibMatrixAgg public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop, int k) { //fall back to sequential version if necessary - if( k <= 1 || (long)in.nonZeros < PAR_NUMCELL_THRESHOLD || in.rlen <= k/2 - || (!(uaop.indexFn instanceof ReduceCol) && out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD ) || - !out.isThreadSafe()) { + if( !satisfiesMultiThreadingConstraints(in, out, uaop, k) ) { aggregateUnaryMatrix(in, out, uaop); return; } @@ -241,11 +241,12 @@ public class LibMatrixAgg try { ExecutorService pool = CommonThreadPool.get(k); ArrayList<AggTask> tasks = new ArrayList<>(); - int blklen = (int)(Math.ceil((double)m/k)); - for( int i=0; i<k & i*blklen<m; i++ ) { + ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(m, k, + (uaop.indexFn instanceof ReduceRow)); //use static partitioning for col*() + for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) { tasks.add( (uaop.indexFn instanceof ReduceCol) ? - new RowAggTask(in, out, aggtype, uaop, i*blklen, Math.min((i+1)*blklen, m)) : - new PartialAggTask(in, out, aggtype, uaop, i*blklen, Math.min((i+1)*blklen, m)) ); + new RowAggTask(in, out, aggtype, uaop, lb, lb+blklens.get(i)) : + new PartialAggTask(in, out, aggtype, uaop, lb, lb+blklens.get(i)) ); } pool.invokeAll(tasks); pool.shutdown(); @@ -259,7 +260,7 @@ public class LibMatrixAgg catch(Exception ex) { throw new DMLRuntimeException(ex); } - + //cleanup output and change representation (if necessary) out.recomputeNonZeros(); out.examSparsity(); @@ -303,7 +304,7 @@ public class LibMatrixAgg AggregateUnaryOperator uaop = InstructionUtils.parseBasicCumulativeAggregateUnaryOperator(uop); //fall back to sequential if necessary or agg not supported - if( k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD || in.rlen <= k + if( k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD1 || in.rlen <= k || out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD || uaop == null || !out.isThreadSafe()) { return cumaggregateUnaryMatrix(in, out, uop); } @@ -407,7 +408,7 @@ public class LibMatrixAgg public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) { //fall back to sequential version if necessary - if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD || in1.rlen <= k/2 + if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD1 || in1.rlen <= k/2 || (!(op.indexFn instanceof ReduceCol) && ret.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD) ) { return aggregateTernary(in1, in2, in3, ret, op); } @@ -479,7 +480,7 @@ public class LibMatrixAgg { //fall back to sequential version if necessary boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1); - if( k <= 1 || (long)target.rlen*target.clen < PAR_NUMCELL_THRESHOLD || rowVector || target.clen==1) { + if( k <= 1 || (long)target.rlen*target.clen < PAR_NUMCELL_THRESHOLD1 || rowVector || target.clen==1) { groupedAggregate(groups, target, weights, result, numGroups, op); return; } @@ -514,18 +515,23 @@ public class LibMatrixAgg result.examSparsity(); } - public static boolean isSupportedUnaryAggregateOperator( AggregateUnaryOperator op ) - { + public static boolean isSupportedUnaryAggregateOperator( AggregateUnaryOperator op ) { AggType type = getAggType( op ); return (type != AggType.INVALID); } - public static boolean isSupportedUnaryOperator( UnaryOperator op ) - { + public static boolean isSupportedUnaryOperator( UnaryOperator op ) { AggType type = getAggType( op ); return (type != AggType.INVALID); } + public static boolean satisfiesMultiThreadingConstraints(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop, int k) { + boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); + return k > 1 && out.isThreadSafe() && in.rlen > (sharedTP ? k/8 : k/2) + && (uaop.indexFn instanceof ReduceCol || out.clen*8*k < PAR_INTERMEDIATE_SIZE_THRESHOLD) //size + && in.nonZeros > (sharedTP ? PAR_NUMCELL_THRESHOLD2 : PAR_NUMCELL_THRESHOLD1); + } + /** * Recompute outputs (e.g., maxindex or minindex) according to block indexes from MR. * TODO: this should not be part of block operations but of the MR instruction. http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java index e7290a8..1eef244 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java @@ -198,7 +198,7 @@ public class LibMatrixMult try { ExecutorService pool = CommonThreadPool.get(k); ArrayList<MatrixMultTask> tasks = new ArrayList<>(); - ArrayList<Integer> blklens = getBalancedBlockSizesDefault(num, k, (pm2r||pm2c)); + ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(num, k, (pm2r||pm2c)); for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) tasks.add(new MatrixMultTask(m1, m2, ret, tm2, pm2r, pm2c, lb, lb+blklens.get(i))); //execute tasks @@ -303,7 +303,7 @@ public class LibMatrixMult //(currently: always parallelization over number of rows) try { ExecutorService pool = CommonThreadPool.get(k); - ArrayList<Integer> blklens = getBalancedBlockSizesDefault(mX.rlen, k, true); + ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(mX.rlen, k, true); ArrayList<MatrixMultChainTask> tasks = new ArrayList<>(); for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) tasks.add(new MatrixMultChainTask(mX, mV, mW, ct, lb, lb+blklens.get(i))); @@ -3706,32 +3706,6 @@ public class LibMatrixMult } } - - public static ArrayList<Integer> getBalancedBlockSizesDefault(int len, int k, boolean constK) { - int nk = constK ? k : UtilFunctions.roundToNext(Math.min(8*k,len/32), k); - return getBalancedBlockSizes(len, nk); - } - - public static ArrayList<Integer> getAlignedBlockSizes(int len, int k, int align) { - int blklen = (int)(Math.ceil((double)len/k)); - blklen += ((blklen%align != 0) ? align-blklen%align : 0); - ArrayList<Integer> ret = new ArrayList<>(); - for(int i=0; i<len; i+=blklen) - ret.add(Math.min(blklen, len-i)); - return ret; - } - - private static ArrayList<Integer> getBalancedBlockSizes(int len, int k) { - ArrayList<Integer> ret = new ArrayList<>(); - int base = len / k; - int rest = len % k; - for( int i=0; i<k; i++ ) { - int val = base + (i<rest?1:0); - if( val > 0 ) - ret.add(val); - } - return ret; - } ///////////////////////////////////////////////////////// // Task Implementations for Multi-Threaded Operations // http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index e0ac955..f0911bb 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -150,6 +150,32 @@ public class UtilFunctions return (int)Math.min(blockSize, remain); } + public static ArrayList<Integer> getBalancedBlockSizesDefault(int len, int k, boolean constK) { + int nk = constK ? k : roundToNext(Math.min(8*k,len/32), k); + return getBalancedBlockSizes(len, nk); + } + + public static ArrayList<Integer> getAlignedBlockSizes(int len, int k, int align) { + int blklen = (int)(Math.ceil((double)len/k)); + blklen += ((blklen%align != 0) ? align-blklen%align : 0); + ArrayList<Integer> ret = new ArrayList<>(len/blklen); + for(int i=0; i<len; i+=blklen) + ret.add(Math.min(blklen, len-i)); + return ret; + } + + private static ArrayList<Integer> getBalancedBlockSizes(int len, int k) { + ArrayList<Integer> ret = new ArrayList<>(k); + int base = len / k; + int rest = len % k; + for( int i=0; i<k; i++ ) { + int val = base + (i<rest?1:0); + if( val > 0 ) + ret.add(val); + } + return ret; + } + public static boolean isInBlockRange( MatrixIndexes ix, int brlen, int bclen, long rl, long ru, long cl, long cu ) { long bRLowerIndex = (ix.getRowIndex()-1)*brlen + 1;
