[SYSTEMML-1977] Fix codegen spark row ops w/ multiple rdd inputs This patch fixes special cases of distributed codegen spark row operations with multiple rdd inputs (i.e., in case of large side inputs that cannot be broadcast). We now handle the meta data management at the driver which removes this implicit assumption that relevant inputs for B1 row types are available as broadcasts.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/118e3c0f Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/118e3c0f Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/118e3c0f Branch: refs/heads/master Commit: 118e3c0f630d3a3b30755ecb712672d79f8b8d7c Parents: ede870d Author: Matthias Boehm <[email protected]> Authored: Fri Oct 27 23:13:54 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Oct 27 23:13:54 2017 -0700 ---------------------------------------------------------------------- .../instructions/spark/SpoofSPInstruction.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/118e3c0f/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index b34afad..eb74fed 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -166,7 +166,7 @@ public class SpoofSPInstruction extends SPInstruction { } else if(_class.getSuperclass() == SpoofMultiAggregate.class) //MAGG { - SpoofMultiAggregate op = (SpoofMultiAggregate) CodegenUtils.createInstance(_class); + SpoofMultiAggregate op = (SpoofMultiAggregate) CodegenUtils.createInstance(_class); AggOp[] aggOps = op.getAggOps(); MatrixBlock tmpMB = in.mapToPair(new MultiAggregateFunction( @@ -178,7 +178,7 @@ public class SpoofSPInstruction extends SPInstruction { else if(_class.getSuperclass() == SpoofOuterProduct.class) //OUTER { if( _out.getDataType()==DataType.MATRIX ) { - SpoofOperator op = (SpoofOperator) CodegenUtils.createInstance(_class); + SpoofOperator op = (SpoofOperator) CodegenUtils.createInstance(_class); OutProdType type = ((SpoofOuterProduct)op).getOuterProdType(); //update matrix characteristics @@ -211,9 +211,11 @@ public class SpoofSPInstruction extends SPInstruction { throw new DMLRuntimeException("Invalid spark rowwise operator w/ ncol=" + mcIn.getCols()+", ncolpb="+mcIn.getColsPerBlock()+"."); } - SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class); + SpoofRowwise op = (SpoofRowwise) CodegenUtils.createInstance(_class); + long clen2 = (op.getRowType()==RowType.NO_AGG_CONST) ? op.getConstDim2() : + op.getRowType().isRowTypeB1() ? sec.getMatrixCharacteristics(_in[1].getName()).getCols() : -1; RowwiseFunction fmmc = new RowwiseFunction(_class.getName(), - _classBytes, bcVect2, bcMatrices, scalars, (int)mcIn.getCols()); + _classBytes, bcVect2, bcMatrices, scalars, (int)mcIn.getCols(), (int)clen2); out = in.mapPartitionsToPair(fmmc, op.getRowType()==RowType.ROW_AGG || op.getRowType() == RowType.NO_AGG); @@ -434,13 +436,15 @@ public class SpoofSPInstruction extends SPInstruction { private static final long serialVersionUID = -7926980450209760212L; private final int _clen; + private final int _clen2; private SpoofRowwise _op = null; - public RowwiseFunction(String className, byte[] classBytes, boolean[] bcInd, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen) + public RowwiseFunction(String className, byte[] classBytes, boolean[] bcInd, ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices, ArrayList<ScalarObject> scalars, int clen, int clen2) throws DMLRuntimeException { super(className, classBytes, bcInd, bcMatrices, scalars); _clen = clen; + _clen2 = clen; } @Override @@ -454,9 +458,7 @@ public class SpoofSPInstruction extends SPInstruction { } //setup local memory for reuse - int clen2 = (int) ((_op.getRowType()==RowType.NO_AGG_CONST) ? _op.getConstDim2() : - _op.getRowType().isRowTypeB1() ? _inputs.get(0).getNumCols() : -1); - LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen, clen2); + LibSpoofPrimitives.setupThreadLocalMemory(_op.getNumIntermediates(), _clen, _clen2); ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<>(); boolean aggIncr = (_op.getRowType().isColumnAgg() //aggregate entire partition
