Repository: systemml
Updated Branches:
  refs/heads/master 88a52eb04 -> 53b489ced


[SYSTEMML-2212] Performance multi-threaded unary aggregates

This patch makes two performance improvements for unary aggregates such
as sum(X), rowSums(X), and colSums(X). First, we now use data-size
thresholds for multi-threading that are aware of the shared thread pool
(introduced in SYSTEMML-2193) which makes multi-threading beneficial for
much smaller data sizes. Second, we use the task partitioning primitives
from matrix multiplication for better load balance.

On a scenario of 10,000 sum(X) operations, the cumulative execution time
improved as follow for different data sizes:
1)  1K x 1K  (8MB): 59.7s ->  4.4s
2)  2K x 1K (16MB):  5.8s ->  5.8s
3) 10K x 1K (80MB): 17.7s -> 12.4s


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8693ae65
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8693ae65
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8693ae65

Branch: refs/heads/master
Commit: 8693ae65169046cf545c1b93f75b7f4cc0ce6541
Parents: 88a52eb
Author: Matthias Boehm <[email protected]>
Authored: Sun Mar 25 21:19:47 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sun Mar 25 21:19:47 2018 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     |  5 +--
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 38 +++++++++++---------
 .../runtime/matrix/data/LibMatrixMult.java      | 30 ++--------------
 .../sysml/runtime/util/UtilFunctions.java       | 26 ++++++++++++++
 4 files changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index 6e5646d..a4e2e7a 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseRow;
 import org.apache.sysml.runtime.matrix.data.SparseRowVector;
 import org.apache.sysml.runtime.util.CommonThreadPool;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public abstract class SpoofRowwise extends SpoofOperator
@@ -200,8 +201,8 @@ public abstract class SpoofRowwise extends SpoofOperator
                //core parallel execute
                ExecutorService pool = CommonThreadPool.get(k);
                ArrayList<Integer> blklens = (a instanceof 
CompressedMatrixBlock) ?
-                       LibMatrixMult.getAlignedBlockSizes(m, k, 
BitmapEncoder.BITMAP_BLOCK_SZ) :
-                       LibMatrixMult.getBalancedBlockSizesDefault(m, k, 
(long)m*n<16*PAR_NUMCELL_THRESHOLD);
+                       UtilFunctions.getAlignedBlockSizes(m, k, 
BitmapEncoder.BITMAP_BLOCK_SZ) :
+                       UtilFunctions.getBalancedBlockSizesDefault(m, k, 
(long)m*n<16*PAR_NUMCELL_THRESHOLD);
                
                try
                {

http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index f99accb..2d81255 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Future;
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.functionobjects.CM;
@@ -81,7 +82,8 @@ public class LibMatrixAgg
 {
        //internal configuration parameters
        private static final boolean NAN_AWARENESS = false;
-       private static final long PAR_NUMCELL_THRESHOLD = 1024*1024;   //Min 1M 
elements
+       private static final long PAR_NUMCELL_THRESHOLD1 = 1024*1024; //Min 1M 
elements
+       private static final long PAR_NUMCELL_THRESHOLD2 = 16*1024;   //Min 16K 
elements
        private static final long PAR_INTERMEDIATE_SIZE_THRESHOLD = 
2*1024*1024; //Max 2MB
        
        ////////////////////////////////
@@ -209,9 +211,7 @@ public class LibMatrixAgg
 
        public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock 
out, AggregateUnaryOperator uaop, int k) {
                //fall back to sequential version if necessary
-               if(    k <= 1 || (long)in.nonZeros < PAR_NUMCELL_THRESHOLD || 
in.rlen <= k/2
-                       || (!(uaop.indexFn instanceof ReduceCol) &&  
out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD ) || 
-                       !out.isThreadSafe()) {
+               if( !satisfiesMultiThreadingConstraints(in, out, uaop, k) ) {
                        aggregateUnaryMatrix(in, out, uaop);
                        return;
                }
@@ -241,11 +241,12 @@ public class LibMatrixAgg
                try {
                        ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<AggTask> tasks = new ArrayList<>();
-                       int blklen = (int)(Math.ceil((double)m/k));
-                       for( int i=0; i<k & i*blklen<m; i++ ) {
+                       ArrayList<Integer> blklens = 
UtilFunctions.getBalancedBlockSizesDefault(m, k,
+                               (uaop.indexFn instanceof ReduceRow)); //use 
static partitioning for col*()
+                       for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ ) {
                                tasks.add( (uaop.indexFn instanceof ReduceCol) 
? 
-                                               new RowAggTask(in, out, 
aggtype, uaop, i*blklen, Math.min((i+1)*blklen, m)) :
-                                               new PartialAggTask(in, out, 
aggtype, uaop, i*blklen, Math.min((i+1)*blklen, m)) );
+                                       new RowAggTask(in, out, aggtype, uaop, 
lb, lb+blklens.get(i)) :
+                                       new PartialAggTask(in, out, aggtype, 
uaop, lb, lb+blklens.get(i)) );
                        }
                        pool.invokeAll(tasks);
                        pool.shutdown();
@@ -259,7 +260,7 @@ public class LibMatrixAgg
                catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
-                               
+               
                //cleanup output and change representation (if necessary)
                out.recomputeNonZeros();
                out.examSparsity();
@@ -303,7 +304,7 @@ public class LibMatrixAgg
                AggregateUnaryOperator uaop = 
InstructionUtils.parseBasicCumulativeAggregateUnaryOperator(uop);
                
                //fall back to sequential if necessary or agg not supported
-               if(    k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD 
|| in.rlen <= k
+               if(    k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD1 
|| in.rlen <= k
                        || out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD || 
uaop == null || !out.isThreadSafe()) {
                        return cumaggregateUnaryMatrix(in, out, uop);
                }
@@ -407,7 +408,7 @@ public class LibMatrixAgg
 
        public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock 
in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) {
                //fall back to sequential version if necessary
-               if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD 
|| in1.rlen <= k/2 
+               if( k <= 1 || in1.nonZeros+in2.nonZeros < 
PAR_NUMCELL_THRESHOLD1 || in1.rlen <= k/2 
                        || (!(op.indexFn instanceof ReduceCol) &&  ret.clen*8*k 
> PAR_INTERMEDIATE_SIZE_THRESHOLD) ) {
                        return aggregateTernary(in1, in2, in3, ret, op);
                }
@@ -479,7 +480,7 @@ public class LibMatrixAgg
        {
                //fall back to sequential version if necessary
                boolean rowVector = (target.getNumRows()==1 && 
target.getNumColumns()>1);
-               if( k <= 1 || (long)target.rlen*target.clen < 
PAR_NUMCELL_THRESHOLD || rowVector || target.clen==1) {
+               if( k <= 1 || (long)target.rlen*target.clen < 
PAR_NUMCELL_THRESHOLD1 || rowVector || target.clen==1) {
                        groupedAggregate(groups, target, weights, result, 
numGroups, op);
                        return;
                }
@@ -514,18 +515,23 @@ public class LibMatrixAgg
                result.examSparsity();
        }
 
-       public static boolean isSupportedUnaryAggregateOperator( 
AggregateUnaryOperator op )
-       {
+       public static boolean isSupportedUnaryAggregateOperator( 
AggregateUnaryOperator op ) {
                AggType type = getAggType( op );
                return (type != AggType.INVALID);
        }
        
-       public static boolean isSupportedUnaryOperator( UnaryOperator op )
-       {
+       public static boolean isSupportedUnaryOperator( UnaryOperator op ) {
                AggType type = getAggType( op );
                return (type != AggType.INVALID);
        }
        
+       public static boolean satisfiesMultiThreadingConstraints(MatrixBlock 
in, MatrixBlock out, AggregateUnaryOperator uaop, int k) {
+               boolean sharedTP = 
(InfrastructureAnalyzer.getLocalParallelism() == k);
+               return k > 1 && out.isThreadSafe() && in.rlen > (sharedTP ? k/8 
: k/2)
+                       && (uaop.indexFn instanceof ReduceCol || out.clen*8*k < 
PAR_INTERMEDIATE_SIZE_THRESHOLD) //size
+                       && in.nonZeros > (sharedTP ? PAR_NUMCELL_THRESHOLD2 : 
PAR_NUMCELL_THRESHOLD1);
+       }
+       
        /**
         * Recompute outputs (e.g., maxindex or minindex) according to block 
indexes from MR.
         * TODO: this should not be part of block operations but of the MR 
instruction.

http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index e7290a8..1eef244 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -198,7 +198,7 @@ public class LibMatrixMult
                try {
                        ExecutorService pool = CommonThreadPool.get(k);
                        ArrayList<MatrixMultTask> tasks = new ArrayList<>();
-                       ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(num, k, (pm2r||pm2c));
+                       ArrayList<Integer> blklens = 
UtilFunctions.getBalancedBlockSizesDefault(num, k, (pm2r||pm2c));
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
                                tasks.add(new MatrixMultTask(m1, m2, ret, tm2, 
pm2r, pm2c, lb, lb+blklens.get(i)));
                        //execute tasks
@@ -303,7 +303,7 @@ public class LibMatrixMult
                //(currently: always parallelization over number of rows)
                try {
                        ExecutorService pool = CommonThreadPool.get(k);
-                       ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(mX.rlen, k, true);
+                       ArrayList<Integer> blklens = 
UtilFunctions.getBalancedBlockSizesDefault(mX.rlen, k, true);
                        ArrayList<MatrixMultChainTask> tasks = new 
ArrayList<>();
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
                                tasks.add(new MatrixMultChainTask(mX, mV, mW, 
ct, lb, lb+blklens.get(i)));
@@ -3706,32 +3706,6 @@ public class LibMatrixMult
                }
                
        }
-
-       public static ArrayList<Integer> getBalancedBlockSizesDefault(int len, 
int k, boolean constK) {
-               int nk = constK ? k : 
UtilFunctions.roundToNext(Math.min(8*k,len/32), k);
-               return getBalancedBlockSizes(len, nk);
-       }
-       
-       public static ArrayList<Integer> getAlignedBlockSizes(int len, int k, 
int align) {
-               int blklen = (int)(Math.ceil((double)len/k));
-               blklen += ((blklen%align != 0) ? align-blklen%align : 0);
-               ArrayList<Integer> ret = new ArrayList<>();
-               for(int i=0; i<len; i+=blklen)
-                       ret.add(Math.min(blklen, len-i));
-               return ret;
-       }
-       
-       private static ArrayList<Integer> getBalancedBlockSizes(int len, int k) 
{
-               ArrayList<Integer> ret = new ArrayList<>();
-               int base = len / k;
-               int rest = len % k;
-               for( int i=0; i<k; i++ ) {
-                       int val = base + (i<rest?1:0);
-                       if( val > 0 )
-                               ret.add(val);
-               }       
-               return ret; 
-       }
        
        /////////////////////////////////////////////////////////
        // Task Implementations for Multi-Threaded Operations  //

http://git-wip-us.apache.org/repos/asf/systemml/blob/8693ae65/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index e0ac955..f0911bb 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -150,6 +150,32 @@ public class UtilFunctions
                return (int)Math.min(blockSize, remain);
        }
 
+       public static ArrayList<Integer> getBalancedBlockSizesDefault(int len, 
int k, boolean constK) {
+               int nk = constK ? k : roundToNext(Math.min(8*k,len/32), k);
+               return getBalancedBlockSizes(len, nk);
+       }
+       
+       public static ArrayList<Integer> getAlignedBlockSizes(int len, int k, 
int align) {
+               int blklen = (int)(Math.ceil((double)len/k));
+               blklen += ((blklen%align != 0) ? align-blklen%align : 0);
+               ArrayList<Integer> ret = new ArrayList<>(len/blklen);
+               for(int i=0; i<len; i+=blklen)
+                       ret.add(Math.min(blklen, len-i));
+               return ret;
+       }
+       
+       private static ArrayList<Integer> getBalancedBlockSizes(int len, int k) 
{
+               ArrayList<Integer> ret = new ArrayList<>(k);
+               int base = len / k;
+               int rest = len % k;
+               for( int i=0; i<k; i++ ) {
+                       int val = base + (i<rest?1:0);
+                       if( val > 0 )
+                               ret.add(val);
+               }
+               return ret;
+       }
+       
        public static boolean isInBlockRange( MatrixIndexes ix, int brlen, int 
bclen, long rl, long ru, long cl, long cu )
        {
                long bRLowerIndex = (ix.getRowIndex()-1)*brlen + 1;

Reply via email to