[SYSTEMML-749] Fix spark removeEmpty instruction w/ empty input matrix

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/870da0f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/870da0f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/870da0f9

Branch: refs/heads/master
Commit: 870da0f9f6b7dcd99b64b1bef61d996f8065e529
Parents: 9b3c042
Author: Matthias Boehm <[email protected]>
Authored: Thu Jun 2 19:02:51 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jun 2 19:02:51 2016 -0700

----------------------------------------------------------------------
 .../ParameterizedBuiltinSPInstruction.java      | 81 +++++++++++---------
 1 file changed, 45 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/870da0f9/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index b8d2f6c..986d8e8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -291,46 +291,55 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        String rddInVar = params.get("target");
                        String rddOffVar = params.get("offset");
                        
-                       //get input rdd handle
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable( rddInVar );
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> off;
-                       PartitionedBroadcastMatrix broadcastOff;
-                       MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(rddInVar);
                        boolean rows = sec.getScalarInput(params.get("margin"), 
ValueType.STRING, true).getStringValue().equals("rows");
                        long maxDim = sec.getScalarInput(params.get("maxdim"), 
ValueType.DOUBLE, false).getLongValue();
-                       long brlen = mcIn.getRowsPerBlock();
-                       long bclen = mcIn.getColsPerBlock();
-                       long numRep = (long)Math.ceil( rows ? 
(double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen);
+                       MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(rddInVar);
                        
-                       //execute remove empty rows/cols operation
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> out;
-
-                       if(_bRmEmptyBC){
-                               broadcastOff = 
sec.getBroadcastForVariable(rddOffVar );
-                               // Broadcast offset vector
-                               out = in
-                                       .flatMapToPair(new 
RDDRemoveEmptyFunctionInMem(rows, maxDim, brlen, bclen, broadcastOff));         
     
+                       if( maxDim > 0 ) //default case
+                       {
+                               //get input rdd handle
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable( rddInVar );
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> off;
+                               PartitionedBroadcastMatrix broadcastOff;
+                               long brlen = mcIn.getRowsPerBlock();
+                               long bclen = mcIn.getColsPerBlock();
+                               long numRep = (long)Math.ceil( rows ? 
(double)mcIn.getCols()/bclen : (double)mcIn.getRows()/brlen);
+                               
+                               //execute remove empty rows/cols operation
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> out;
+       
+                               if(_bRmEmptyBC){
+                                       broadcastOff = 
sec.getBroadcastForVariable( rddOffVar );
+                                       // Broadcast offset vector
+                                       out = in
+                                               .flatMapToPair(new 
RDDRemoveEmptyFunctionInMem(rows, maxDim, brlen, bclen, broadcastOff));         
     
+                               }
+                               else {
+                                       off = 
sec.getBinaryBlockRDDHandleForVariable( rddOffVar );
+                                       out = in
+                                               .join( off.flatMapToPair(new 
ReplicateVectorFunction(!rows,numRep)) )
+                                               .flatMapToPair(new 
RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen));         
+                               }                               
+       
+                               out = RDDAggregateUtils.mergeByKey(out);
+                               
+                               //store output rdd handle
+                               sec.setRDDHandleForVariable(output.getName(), 
out);
+                               sec.addLineageRDD(output.getName(), rddInVar);
+                               if(!_bRmEmptyBC)
+                                       sec.addLineageRDD(output.getName(), 
rddOffVar);
+                               else
+                                       
sec.addLineageBroadcast(output.getName(), rddOffVar);
+                               
+                               //update output statistics (required for 
correctness)
+                               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+                               mcOut.set(rows?maxDim:mcIn.getRows(), 
rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros());
+                       }
+                       else //special case: empty output (ensure valid dims)
+                       {
+                               MatrixBlock out = new 
MatrixBlock(rows?1:(int)mcIn.getRows(), rows?(int)mcIn.getCols():1, true); 
+                               sec.setMatrixOutput(output.getName(), out);
                        }
-                       else {
-                               off = sec.getBinaryBlockRDDHandleForVariable( 
rddOffVar );
-                               out = in
-                                       .join( off.flatMapToPair(new 
ReplicateVectorFunction(!rows,numRep)) )
-                                       .flatMapToPair(new 
RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen));         
-                       }                               
-
-                       out = RDDAggregateUtils.mergeByKey(out);
-                       
-                       //store output rdd handle
-                       sec.setRDDHandleForVariable(output.getName(), out);
-                       sec.addLineageRDD(output.getName(), rddInVar);
-                       if(!_bRmEmptyBC)
-                               sec.addLineageRDD(output.getName(), rddOffVar);
-                       else
-                               sec.addLineageBroadcast(output.getName(), 
rddOffVar);           // TODO
-                       
-                       //update output statistics (required for correctness)
-                       MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
-                       mcOut.set(rows?maxDim:mcIn.getRows(), 
rows?mcIn.getCols():maxDim, (int)brlen, (int)bclen, mcIn.getNonZeros());
                }
                else if ( opcode.equalsIgnoreCase("replace") ) 
                {       

Reply via email to