[SYSTEMML-749] Fix spark removeEmpty instruction w/ empty input matrix Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/870da0f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/870da0f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/870da0f9
Branch: refs/heads/master Commit: 870da0f9f6b7dcd99b64b1bef61d996f8065e529 Parents: 9b3c042 Author: Matthias Boehm <[email protected]> Authored: Thu Jun 2 19:02:51 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jun 2 19:02:51 2016 -0700 ---------------------------------------------------------------------- .../ParameterizedBuiltinSPInstruction.java | 81 +++++++++++--------- 1 file changed, 45 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/870da0f9/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index b8d2f6c..986d8e8 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -291,46 +291,55 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction String rddInVar = params.get("target"); String rddOffVar = params.get("offset"); - //get input rdd handle - JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( rddInVar ); - JavaPairRDD<MatrixIndexes,MatrixBlock> off; - PartitionedBroadcastMatrix broadcastOff; - MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(rddInVar); boolean rows = sec.getScalarInput(params.get("margin"), ValueType.STRING, true).getStringValue().equals("rows"); long maxDim = sec.getScalarInput(params.get("maxdim"), ValueType.DOUBLE, false).getLongValue(); - long brlen = mcIn.getRowsPerBlock(); - long bclen = mcIn.getColsPerBlock(); - long numRep = (long)Math.ceil( rows ? (double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen); + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(rddInVar); - //execute remove empty rows/cols operation - JavaPairRDD<MatrixIndexes,MatrixBlock> out; - - if(_bRmEmptyBC){ - broadcastOff = sec.getBroadcastForVariable(rddOffVar ); - // Broadcast offset vector - out = in - .flatMapToPair(new RDDRemoveEmptyFunctionInMem(rows, maxDim, brlen, bclen, broadcastOff)); + if( maxDim > 0 ) //default case + { + //get input rdd handle + JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable( rddInVar ); + JavaPairRDD<MatrixIndexes,MatrixBlock> off; + PartitionedBroadcastMatrix broadcastOff; + long brlen = mcIn.getRowsPerBlock(); + long bclen = mcIn.getColsPerBlock(); + long numRep = (long)Math.ceil( rows ? (double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen); + + //execute remove empty rows/cols operation + JavaPairRDD<MatrixIndexes,MatrixBlock> out; + + if(_bRmEmptyBC){ + broadcastOff = sec.getBroadcastForVariable( rddOffVar ); + // Broadcast offset vector + out = in + .flatMapToPair(new RDDRemoveEmptyFunctionInMem(rows, maxDim, brlen, bclen, broadcastOff)); + } + else { + off = sec.getBinaryBlockRDDHandleForVariable( rddOffVar ); + out = in + .join( off.flatMapToPair(new ReplicateVectorFunction(!rows,numRep)) ) + .flatMapToPair(new RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen)); + } + + out = RDDAggregateUtils.mergeByKey(out); + + //store output rdd handle + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), rddInVar); + if(!_bRmEmptyBC) + sec.addLineageRDD(output.getName(), rddOffVar); + else + sec.addLineageBroadcast(output.getName(), rddOffVar); + + //update output statistics (required for correctness) + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + mcOut.set(rows?maxDim:mcIn.getRows(), rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros()); + } + else //special case: empty output (ensure valid dims) + { + MatrixBlock out = new MatrixBlock(rows?1:(int)mcIn.getRows(), rows?(int)mcIn.getCols():1, true); + sec.setMatrixOutput(output.getName(), out); } - else { - off = sec.getBinaryBlockRDDHandleForVariable( rddOffVar ); - out = in - .join( off.flatMapToPair(new ReplicateVectorFunction(!rows,numRep)) ) - .flatMapToPair(new RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen)); - } - - out = RDDAggregateUtils.mergeByKey(out); - - //store output rdd handle - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), rddInVar); - if(!_bRmEmptyBC) - sec.addLineageRDD(output.getName(), rddOffVar); - else - sec.addLineageBroadcast(output.getName(), rddOffVar); // TODO - - //update output statistics (required for correctness) - MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); - mcOut.set(rows?maxDim:mcIn.getRows(), rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros()); } else if ( opcode.equalsIgnoreCase("replace") ) {
