[SYSTEMML-1512] Performance spark rowwise codegen instructions This patch makes the following incremental performance improvements to the spark instruction for rowwise codegen templates:
(1) Partitioning-preserving operations for row-aggregates (row-wise templates have a constraint of clen<=bclen, i.e., a single column block, and hence row aggregates are always partitioning preserving) (2) Reduced allocation of temporary row vectors (we now allocate temporary row vectors once per partition instead of once per block) (3) Incremental aggregation for column aggregates (we now incrementally aggregate output vectors for column aggregates instead of allocating them per block) On a scenario of a 10M x 10K dense input matrix and 100 iterations of a row-wise template with broadcasts and two vector intermediates, changes (2) and (3) improved the total runtime from 253s to 204s. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0d625a05 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0d625a05 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0d625a05 Branch: refs/heads/master Commit: 0d625a05e44e8e613651be4bf9ae4b833c087ffd Parents: 1cd6286 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Apr 14 23:33:10 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Fri Apr 14 23:33:10 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/codegen/SpoofRowwise.java | 24 +++++-- .../instructions/spark/SpoofSPInstruction.java | 66 +++++++++++++------- 2 files changed, 61 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java index b100a89..8b9fb4d 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java @@ -61,6 +61,10 @@ public abstract class SpoofRowwise extends SpoofOperator public RowType getRowType() { return _type; } + + public int getNumIntermediates() { + return _reqVectMem; + } @Override public String getSpoofType() { @@ -69,7 +73,13 @@ public abstract class SpoofRowwise extends SpoofOperator @Override public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) - throws DMLRuntimeException + throws DMLRuntimeException + { + execute(inputs, scalarObjects, out, true, false); + } + + public void execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, boolean allocTmp, boolean aggIncr) + throws DMLRuntimeException { //sanity check if( inputs==null || inputs.size() < 1 || out==null ) @@ -78,23 +88,27 @@ public abstract class SpoofRowwise extends SpoofOperator //result allocation and preparations final int m = inputs.get(0).getNumRows(); final int n = inputs.get(0).getNumColumns(); - allocateOutputMatrix(m, n, out); + if( !aggIncr || !out.isAllocated() ) + allocateOutputMatrix(m, n, out); double[] c = out.getDenseBlock(); //input preparation double[][] b = prepInputMatrices(inputs); double[] scalars = prepInputScalars(scalarObjects); - //core sequential execute + //setup thread-local memory if necessary + if( allocTmp ) + LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, n); - LibSpoofPrimitives.setupThreadLocalMemory(_reqVectMem, n); + //core sequential execute if( !inputs.get(0).isInSparseFormat() ) executeDense(inputs.get(0).getDenseBlock(), b, scalars, c, n, 0, m); else executeSparse(inputs.get(0).getSparseBlock(), b, scalars, c, n, 0, m); //post-processing - LibSpoofPrimitives.cleanupThreadLocalMemory(); + if( allocTmp ) + LibSpoofPrimitives.cleanupThreadLocalMemory(); out.recomputeNonZeros(); out.examSparsity(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0d625a05/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index be8e849..be3a76d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -31,6 +31,7 @@ import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.codegen.CodegenUtils; +import org.apache.sysml.runtime.codegen.LibSpoofPrimitives; import org.apache.sysml.runtime.codegen.SpoofCellwise; import org.apache.sysml.runtime.codegen.SpoofMultiAggregate; import org.apache.sysml.runtime.codegen.SpoofCellwise.AggOp; @@ -198,16 +199,15 @@ public class SpoofSPInstruction extends SPInstruction } else if( _class.getSuperclass() == SpoofRowwise.class ) { //row aggregate operator SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class); - RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars); + RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), _classBytes, bcMatrices, scalars, (int)mcIn.getCols()); + out = in.mapPartitionsToPair(fmmc, op.getRowType()==RowType.ROW_AGG); if( op.getRowType().isColumnAgg() ) { - JavaPairRDD<MatrixIndexes,MatrixBlock> tmpRDD = in.mapToPair(fmmc); - MatrixBlock tmpMB = RDDAggregateUtils.sumStable(tmpRDD); + MatrixBlock tmpMB = RDDAggregateUtils.sumStable(out); sec.setMatrixOutput(_out.getName(), tmpMB); } else //row-agg or no-agg { - out = in.mapToPair(fmmc); if( op.getRowType()==RowType.ROW_AGG && mcIn.getCols() > mcIn.getColsPerBlock() ) { //TODO investigate if some other side effect of correct blocks if( out.partitions().size() > mcIn.getNumRowBlocks() ) @@ -275,27 +275,29 @@ public class SpoofSPInstruction extends SPInstruction } } - private static class RowwiseFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> + private static class RowwiseFunction implements PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock>>, MatrixIndexes, MatrixBlock> { private static final long serialVersionUID = -7926980450209760212L; - private ArrayList<PartitionedBroadcast<MatrixBlock>> _vectors = null; - private ArrayList<ScalarObject> _scalars = null; - private byte[] _classBytes = null; - private String _className = null; + private final ArrayList<PartitionedBroadcast<MatrixBlock>> _vectors; + private final ArrayList<ScalarObject> _scalars; + private final byte[] _classBytes; + private final String _className; + private final int _clen; private SpoofRowwise _op = null; - public RowwiseFunction(String className, byte[] classBytes, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars) + public RowwiseFunction(String className, byte[] classBytes, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen) throws DMLRuntimeException { _className = className; _classBytes = classBytes; _vectors = bcMatrices; _scalars = scalars; + _clen = clen; } @Override - public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg ) throws Exception { //lazy load of shipped class @@ -304,21 +306,37 @@ public class SpoofSPInstruction extends SPInstruction _op = (SpoofRowwise) CodegenUtils.createInstance(loadedClass); } - //get main input block and indexes - MatrixIndexes ixIn = arg0._1(); - MatrixBlock blkIn = arg0._2(); - int rowIx = (int)ixIn.getRowIndex(); + //setup local memory for reuse + LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen); - //prepare output and execute single-threaded operator - ArrayList<MatrixBlock> inputs = getVectorInputsFromBroadcast(blkIn, rowIx); - MatrixIndexes ixOut = new MatrixIndexes( - _op.getRowType().isColumnAgg() ? 1 : ixIn.getRowIndex(), - _op.getRowType()!=RowType.NO_AGG ? 1 : ixIn.getColumnIndex()); - MatrixBlock blkOut = new MatrixBlock(); - _op.execute(inputs, _scalars, blkOut); + ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); + boolean aggIncr = _op.getRowType().isColumnAgg(); //aggregate entire partition to avoid allocations + MatrixBlock blkOut = aggIncr ? new MatrixBlock() : null; - //output new tuple - return new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut); + while( arg.hasNext() ) { + //get main input block and indexes + Tuple2<MatrixIndexes,MatrixBlock> e = arg.next(); + MatrixIndexes ixIn = e._1(); + MatrixBlock blkIn = e._2(); + int rowIx = (int)ixIn.getRowIndex(); + + //prepare output and execute single-threaded operator + ArrayList<MatrixBlock> inputs = getVectorInputsFromBroadcast(blkIn, rowIx); + blkOut = aggIncr ? blkOut : new MatrixBlock(); + _op.execute(inputs, _scalars, blkOut, false, aggIncr); + if( !aggIncr ) { + MatrixIndexes ixOut = new MatrixIndexes(ixIn.getRowIndex(), + _op.getRowType()!=RowType.NO_AGG ? 1 : ixIn.getColumnIndex()); + ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut)); + } + } + + //cleanup and final result preparations + LibSpoofPrimitives.cleanupThreadLocalMemory(); + if( aggIncr ) + ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(1,1), blkOut)); + + return ret.iterator(); } private ArrayList<MatrixBlock> getVectorInputsFromBroadcast(MatrixBlock blkIn, int rowIndex)