[SYSTEMML-1512] Performance spark rowwise codegen instructions

This patch makes the following incremental performance improvements to
the spark instruction for rowwise codegen templates:

(1) Partitioning-preserving operations for row-aggregates (row-wise
templates have a constraint of clen<=bclen, i.e., a single column block,
and hence row aggregates are always partitioning preserving)

(2) Reduced allocation of temporary row vectors (we now allocate
temporary row vectors once per partition instead of once per block)

(3) Incremental aggregation for column aggregates (we now incrementally
aggregate output vectors for column aggregates instead of allocating
them per block)

On a scenario of a 10M x 10K dense input matrix and 100 iterations of a
row-wise template with broadcasts and two vector intermediates, changes
(2) and (3) improved the total runtime from 253s to 204s.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0d625a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0d625a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0d625a05

Branch: refs/heads/master
Commit: 0d625a05e44e8e613651be4bf9ae4b833c087ffd
Parents: 1cd6286
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Apr 14 23:33:10 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Fri Apr 14 23:33:10 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     | 24 +++++--
 .../instructions/spark/SpoofSPInstruction.java  | 66 +++++++++++++-------
 2 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java 
b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index b100a89..8b9fb4d 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -61,6 +61,10 @@ public abstract class SpoofRowwise extends SpoofOperator
        public RowType getRowType() {
                return _type;
        }
+       
+       public int getNumIntermediates() {
+               return _reqVectMem;
+       }
 
        @Override
        public String getSpoofType() {
@@ -69,7 +73,13 @@ public abstract class SpoofRowwise extends SpoofOperator
        
        @Override
        public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out)      
-               throws DMLRuntimeException
+               throws DMLRuntimeException 
+       {
+               execute(inputs, scalarObjects, out, true, false);
+       }
+       
+       public void execute(ArrayList<MatrixBlock> inputs, 
ArrayList<ScalarObject> scalarObjects, MatrixBlock out, boolean allocTmp, 
boolean aggIncr) 
+               throws DMLRuntimeException      
        {
                //sanity check
                if( inputs==null || inputs.size() < 1 || out==null )
@@ -78,23 +88,27 @@ public abstract class SpoofRowwise extends SpoofOperator
                //result allocation and preparations
                final int m = inputs.get(0).getNumRows();
                final int n = inputs.get(0).getNumColumns();
-               allocateOutputMatrix(m, n, out);
+               if( !aggIncr || !out.isAllocated() )
+                       allocateOutputMatrix(m, n, out);
                double[] c = out.getDenseBlock();
                
                //input preparation
                double[][] b = prepInputMatrices(inputs);
                double[] scalars = prepInputScalars(scalarObjects);
                
-               //core sequential execute
+               //setup thread-local memory if necessary
+               if( allocTmp )
+                       LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, 
n);
                
-               LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, n);
+               //core sequential execute
                if( !inputs.get(0).isInSparseFormat() )
                        executeDense(inputs.get(0).getDenseBlock(), b, scalars, 
c, n, 0, m);
                else
                        executeSparse(inputs.get(0).getSparseBlock(), b, 
scalars, c, n, 0, m);
        
                //post-processing
-               LibSpoofPrimitives.cleanupThreadLocalMemory();
+               if( allocTmp )
+                       LibSpoofPrimitives.cleanupThreadLocalMemory();
                out.recomputeNonZeros();
                out.examSparsity();
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index be8e849..be3a76d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -31,6 +31,7 @@ import 
org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.codegen.CodegenUtils;
+import org.apache.sysml.runtime.codegen.LibSpoofPrimitives;
 import org.apache.sysml.runtime.codegen.SpoofCellwise;
 import org.apache.sysml.runtime.codegen.SpoofMultiAggregate;
 import org.apache.sysml.runtime.codegen.SpoofCellwise.AggOp;
@@ -198,16 +199,15 @@ public class SpoofSPInstruction extends SPInstruction
                }
                else if( _class.getSuperclass() == SpoofRowwise.class ) { //row 
aggregate operator
                        SpoofRowwise op = (SpoofRowwise) 
CodegenUtils.createInstance(_class);   
-                       RowwiseFunction fmmc = new 
RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars);
+                       RowwiseFunction fmmc = new 
RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars, 
(int)mcIn.getCols());
+                       out = in.mapPartitionsToPair(fmmc, 
op.getRowType()==RowType.ROW_AGG);
                        
                        if( op.getRowType().isColumnAgg() ) {
-                               JavaPairRDD<MatrixIndexes,MatrixBlock> tmpRDD = 
in.mapToPair(fmmc);
-                               MatrixBlock tmpMB = 
RDDAggregateUtils.sumStable(tmpRDD);                
+                               MatrixBlock tmpMB = 
RDDAggregateUtils.sumStable(out);           
                                sec.setMatrixOutput(_out.getName(), tmpMB);
                        }
                        else //row-agg or no-agg 
                        {
-                               out = in.mapToPair(fmmc);
                                if( op.getRowType()==RowType.ROW_AGG && 
mcIn.getCols() > mcIn.getColsPerBlock() ) {
                                        //TODO investigate if some other side 
effect of correct blocks
                                        if( out.partitions().size() > 
mcIn.getNumRowBlocks() )
@@ -275,27 +275,29 @@ public class SpoofSPInstruction extends SPInstruction
                }
        }
                
-       private static class RowwiseFunction implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 
+       private static class RowwiseFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock>>, 
MatrixIndexes, MatrixBlock> 
        {
                private static final long serialVersionUID = 
-7926980450209760212L;
 
-               private ArrayList<PartitionedBroadcast<MatrixBlock>> _vectors = 
null;
-               private ArrayList<ScalarObject> _scalars = null;
-               private byte[] _classBytes = null;
-               private String _className = null;
+               private final ArrayList<PartitionedBroadcast<MatrixBlock>> 
_vectors;
+               private final ArrayList<ScalarObject> _scalars;
+               private final byte[] _classBytes;
+               private final String _className;
+               private final int _clen;
                private SpoofRowwise _op = null;
                
-               public RowwiseFunction(String className, byte[] classBytes, 
ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, 
ArrayList<ScalarObject> scalars) 
+               public RowwiseFunction(String className, byte[] classBytes, 
ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, 
ArrayList<ScalarObject> scalars, int clen) 
                        throws DMLRuntimeException
                {                       
                        _className = className;
                        _classBytes = classBytes;
                        _vectors = bcMatrices;
                        _scalars = scalars;
+                       _clen = clen;
                }
                
                @Override
-               public Tuple2<MatrixIndexes, MatrixBlock> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg ) 
                        throws Exception 
                {
                        //lazy load of shipped class
@@ -304,21 +306,37 @@ public class SpoofSPInstruction extends SPInstruction
                                _op = (SpoofRowwise) 
CodegenUtils.createInstance(loadedClass); 
                        }
                        
-                       //get main input block and indexes
-                       MatrixIndexes ixIn = arg0._1();
-                       MatrixBlock blkIn = arg0._2();
-                       int rowIx = (int)ixIn.getRowIndex();
+                       //setup local memory for reuse
+                       
LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen);
                        
-                       //prepare output and execute single-threaded operator
-                       ArrayList<MatrixBlock> inputs = 
getVectorInputsFromBroadcast(blkIn, rowIx);
-                       MatrixIndexes ixOut = new MatrixIndexes(
-                               _op.getRowType().isColumnAgg() ? 1 : 
ixIn.getRowIndex(),
-                               _op.getRowType()!=RowType.NO_AGG ? 1 : 
ixIn.getColumnIndex());
-                       MatrixBlock blkOut = new MatrixBlock();
-                       _op.execute(inputs, _scalars, blkOut);
+                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+                       boolean aggIncr = _op.getRowType().isColumnAgg(); 
//aggregate entire partition to avoid allocations
+                       MatrixBlock blkOut = aggIncr ? new MatrixBlock() : null;
                        
-                       //output new tuple
-                       return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, 
blkOut);
+                       while( arg.hasNext() ) {
+                               //get main input block and indexes
+                               Tuple2<MatrixIndexes,MatrixBlock> e = 
arg.next();
+                               MatrixIndexes ixIn = e._1();
+                               MatrixBlock blkIn = e._2();
+                               int rowIx = (int)ixIn.getRowIndex();
+                               
+                               //prepare output and execute single-threaded 
operator
+                               ArrayList<MatrixBlock> inputs = 
getVectorInputsFromBroadcast(blkIn, rowIx);
+                               blkOut = aggIncr ? blkOut : new MatrixBlock();
+                               _op.execute(inputs, _scalars, blkOut, false, 
aggIncr);
+                               if( !aggIncr ) {
+                                       MatrixIndexes ixOut = new 
MatrixIndexes(ixIn.getRowIndex(),
+                                               
_op.getRowType()!=RowType.NO_AGG ? 1 : ixIn.getColumnIndex());
+                                       ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(ixOut, blkOut));
+                               }
+                       }
+                       
+                       //cleanup and final result preparations
+                       LibSpoofPrimitives.cleanupThreadLocalMemory();
+                       if( aggIncr )
+                               ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(new MatrixIndexes(1,1), blkOut));
+                       
+                       return ret.iterator();
                }
                
                private ArrayList<MatrixBlock> 
getVectorInputsFromBroadcast(MatrixBlock blkIn, int rowIndex) 

Reply via email to