[MINOR] Fix consistency task partitioning in mm, mmchain, codegen row

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/d75a669a
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/d75a669a
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/d75a669a

Branch: refs/heads/master
Commit: d75a669a46381a0a5b54109e7b207613e17ab54e
Parents: 06d5bb0
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sun Oct 29 16:06:55 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sun Oct 29 16:07:05 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     | 19 +++----
 .../runtime/matrix/data/LibMatrixMult.java      | 56 ++++++++++++--------
 2 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/d75a669a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index 9d5675b..b0afd88 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -39,7 +39,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseRow;
 import org.apache.sysml.runtime.matrix.data.SparseRowVector;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public abstract class SpoofRowwise extends SpoofOperator
@@ -198,11 +197,9 @@ public abstract class SpoofRowwise extends SpoofOperator
                
                //core parallel execute
                ExecutorService pool = Executors.newFixedThreadPool( k );
-               int nk = (a instanceof CompressedMatrixBlock) ? k :
-                       UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
-               int blklen = (int)(Math.ceil((double)m/nk));
-               if( a instanceof CompressedMatrixBlock )
-                       blklen = BitmapEncoder.getAlignedBlocksize(blklen);
+               ArrayList<Integer> blklens = (a instanceof 
CompressedMatrixBlock) ?
+                       LibMatrixMult.getAlignedBlockSizes(m, k, 
BitmapEncoder.BITMAP_BLOCK_SZ) :
+                       LibMatrixMult.getBalancedBlockSizesDefault(m, k, false);
                
                try
                {
@@ -210,9 +207,9 @@ public abstract class SpoofRowwise extends SpoofOperator
                                //execute tasks
                                ArrayList<ParColAggTask> tasks = new 
ArrayList<>();
                                int outLen = out.getNumRows() * 
out.getNumColumns();
-                               for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new ParColAggTask(a, b, 
scalars, n, n2, outLen, i*blklen, Math.min((i+1)*blklen, m)));
-                               List<Future<double[]>> taskret = 
pool.invokeAll(tasks); 
+                               for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
+                                       tasks.add(new ParColAggTask(a, b, 
scalars, n, n2, outLen, lb, lb+blklens.get(i)));
+                               List<Future<double[]>> taskret = 
pool.invokeAll(tasks);
                                //aggregate partial results
                                int len = _type.isColumnAgg() ? 
out.getNumRows()*out.getNumColumns() : 1;
                                for( Future<double[]> task : taskret )
@@ -222,8 +219,8 @@ public abstract class SpoofRowwise extends SpoofOperator
                        else {
                                //execute tasks
                                ArrayList<ParExecTask> tasks = new 
ArrayList<>();
-                               for( int i=0; i<nk & i*blklen<m; i++ )
-                                       tasks.add(new ParExecTask(a, b, out, 
scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
+                               for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
+                                       tasks.add(new ParExecTask(a, b, out, 
scalars, n, n2, lb, lb+blklens.get(i)));
                                List<Future<Long>> taskret = 
pool.invokeAll(tasks);
                                //aggregate nnz, no need to aggregate results
                                long nnz = 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/d75a669a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index 684f327..a1f648e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -209,8 +209,7 @@ public class LibMatrixMult
                try {
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
                        ArrayList<MatrixMultTask> tasks = new ArrayList<>();
-                       int nk = (pm2r||pm2c) ? k : 
UtilFunctions.roundToNext(Math.min(8*k,num/32), k);
-                       ArrayList<Integer> blklens = getBalancedBlockSizes(num, 
nk);
+                       ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(num, k, (pm2r||pm2c));
                        for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
                                tasks.add(new MatrixMultTask(m1, m2, ret, tm2, 
pm2r, pm2c, lb, lb+blklens.get(i)));
                        //execute tasks
@@ -261,7 +260,7 @@ public class LibMatrixMult
                }
 
                //Timing time = new Timing(true);
-                               
+               
                //pre-processing: output allocation
                ret.sparse = false;
                ret.allocateDenseBlock();
@@ -312,7 +311,7 @@ public class LibMatrixMult
                }
                
                //Timing time = new Timing(true);
-                               
+               
                //pre-processing (no need to check isThreadSafe)
                ret.sparse = false;
                ret.allocateDenseBlock();
@@ -321,11 +320,10 @@ public class LibMatrixMult
                //(currently: always parallelization over number of rows)
                try {
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
+                       ArrayList<Integer> blklens = 
getBalancedBlockSizesDefault(mX.rlen, k, true);
                        ArrayList<MatrixMultChainTask> tasks = new 
ArrayList<>();
-                       int blklen = (int)(Math.ceil((double)mX.rlen/k));
-                       blklen += (blklen%24 != 0)?24-blklen%24:0;
-                       for( int i=0; i<k & i*blklen<mX.rlen; i++ )
-                               tasks.add(new MatrixMultChainTask(mX, mV, mW, 
ct, i*blklen, Math.min((i+1)*blklen, mX.rlen)));
+                       for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
+                               tasks.add(new MatrixMultChainTask(mX, mV, mW, 
ct, lb, lb+blklens.get(i)));
                        //execute tasks
                        List<Future<double[]>> taskret = pool.invokeAll(tasks); 
                        pool.shutdown();
@@ -1606,10 +1604,18 @@ public class LibMatrixMult
                final int blocksizeI = 24; // constraint: factor of 4
                final int blocksizeJ = 1024;
                double[] tmp = new double[blocksizeI];
+               final int bn = (ru-rl) % blocksizeI;
+               
+               //compute rest (not aligned to blocksize)
+               for( int i=rl, aix=rl*cd; i < rl+bn; i++, aix+=cd ) {
+                       double val = dotProduct(a, b, aix, 0, cd);
+                       val *= (weights) ? w[i] : 1;
+                       val -= (weights2) ? w[i] : 0;
+                       vectMultiplyAdd(val, a, c, aix, 0, cd);
+               }
                
                //blockwise mmchain computation
-               final int bn = ru - ru % blocksizeI; //rl blocksize aligned
-               for( int bi=rl; bi < bn; bi+=blocksizeI ) 
+               for( int bi=rl+bn; bi < ru; bi+=blocksizeI ) 
                {
                        //compute 1st matrix-vector for row block
                        Arrays.fill(tmp, 0);
@@ -1621,10 +1627,10 @@ public class LibMatrixMult
                        
                        //multiply/subtract weights (in-place), if required
                        if( weights ) 
-                               vectMultiply(w, tmp, bi, 0, blocksizeI);        
+                               vectMultiply(w, tmp, bi, 0, blocksizeI);
                        else if( weights2 )
                                vectSubtract(w, tmp, bi, 0, blocksizeI);
-                               
+                       
                        //compute 2nd matrix vector for row block and aggregate
                        for( int bj = 0; bj<cd; bj+=blocksizeJ ) {
                                int bjmin = Math.min(cd-bj, blocksizeJ);
@@ -1633,14 +1639,6 @@ public class LibMatrixMult
                                                a, c, aix, aix+cd, aix+2*cd, 
aix+3*cd, bj, bjmin);
                        }
                }
-               
-               //compute rest (not aligned to blocksize)
-               for( int i=bn, aix=bn*cd; i < ru; i++, aix+=cd ) {
-                       double val = dotProduct(a, b, aix, 0, cd);
-                       val *= (weights) ? w[i] : 1;
-                       val -= (weights2) ? w[i] : 0;
-                       vectMultiplyAdd(val, a, c, aix, 0, cd);
-               }
        }
 
        private static void matrixMultChainSparse(MatrixBlock mX, MatrixBlock 
mV, MatrixBlock mW, MatrixBlock ret, ChainType ct, int rl, int ru) 
@@ -3578,9 +3576,9 @@ public class LibMatrixMult
        
        public static boolean checkParColumnAgg(MatrixBlock m1, int k, boolean 
inclFLOPs) {
                return (8L * m1.clen * k <= MEM_OVERHEAD_THRESHOLD 
-                       && (!inclFLOPs || 4L * m1.rlen * m1.clen >= 
PAR_MINFLOP_THRESHOLD));
+                       && (!inclFLOPs || 4L * m1.rlen * m1.clen / 
(m1.sparse?2:1) >= PAR_MINFLOP_THRESHOLD));
        }
-
+       
        private static boolean checkParMatrixMultRightInputRows( MatrixBlock 
m1, MatrixBlock m2, int k ) {
                //parallelize over rows in rhs matrix if number of rows in 
lhs/output is very small
                return (m1.rlen==1 && LOW_LEVEL_OPTIMIZATION && m2.clen>1 && 
!(m1.isUltraSparse()||m2.isUltraSparse()))
@@ -3676,6 +3674,20 @@ public class LibMatrixMult
                
        }
 
+       public static ArrayList<Integer> getBalancedBlockSizesDefault(int len, 
int k, boolean constK) {
+               int nk = constK ? k : 
UtilFunctions.roundToNext(Math.min(8*k,len/32), k);
+               return getBalancedBlockSizes(len, nk);
+       }
+       
+       public static ArrayList<Integer> getAlignedBlockSizes(int len, int k, 
int align) {
+               int blklen = (int)(Math.ceil((double)len/k));
+               blklen += ((blklen%align != 0) ? align-blklen%align : 0);
+               ArrayList<Integer> ret = new ArrayList<>();
+               for(int i=0; i<len; i+=blklen)
+                       ret.add(Math.min(blklen, len-i));
+               return ret;
+       }
+       
        private static ArrayList<Integer> getBalancedBlockSizes(int len, int k) 
{
                ArrayList<Integer> ret = new ArrayList<>();
                int base = len / k;

Reply via email to