[SYSTEMML-928] Fix spark append compilation for frames (only m/rappend)

For the exec type spark, we support mappend, rappend, galignedappend,
and gappend for matrices but only mappend and rappend for frames. This
patch fixes the related operator selection aware of this restriction.
Furthermore, this also includes various minor runtime cleanups of append
instructions and frame converters.  


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8d5f3cea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8d5f3cea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8d5f3cea

Branch: refs/heads/master
Commit: 8d5f3ceab60beb225f2f62882b772968c1ef29c5
Parents: 8631a14
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Fri Sep 16 23:17:40 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Sat Sep 17 00:25:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/BinaryOp.java    |  7 ++-
 .../instructions/SPInstructionParser.java       | 26 ++++-------
 .../spark/AppendMSPInstruction.java             | 30 +++++++++++++
 .../spark/AppendRSPInstruction.java             | 30 +++++++++++++
 .../spark/FrameAppendMSPInstruction.java        | 24 -----------
 .../spark/FrameAppendRSPInstruction.java        | 23 ----------
 .../spark/MatrixAppendMSPInstruction.java       | 24 -----------
 .../spark/MatrixAppendRSPInstruction.java       | 23 ----------
 .../spark/utils/FrameRDDConverterUtils.java     | 45 +++++++++-----------
 9 files changed, 94 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java 
b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index edc327d..ab0315b 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1302,12 +1302,15 @@ public class BinaryOp extends Hop
                if( cbind && m1_dim2 >= 1 && m2_dim2 >= 0  //column dims known
                        && m1_dim2+m2_dim2 <= m1_cpb   //output has one column 
block
                  ||!cbind && m1_dim1 >= 1 && m2_dim1 >= 0 //row dims known
-                       && m1_dim1+m2_dim1 <= m1_rpb ) //output has one column 
block
+                       && m1_dim1+m2_dim1 <= m1_rpb   //output has one column 
block
+                 || dt == DataType.FRAME ) 
                {
                        return AppendMethod.MR_RAPPEND;
                }
                
-               // if(mc1.getCols() % mc1.getColsPerBlock() == 0) {
+               //note: below append methods are only supported for matrix, not 
frame
+               
+               //special case of block-aligned append line
                if( cbind && m1_dim2 % m1_cpb == 0 
                   || !cbind && m1_dim1 % m1_rpb == 0 ) 
                {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index c74b44e..a244288 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,12 +34,13 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
 import org.apache.sysml.lops.WeightedSquaredLossR;
 import org.apache.sysml.lops.WeightedUnaryMM;
 import org.apache.sysml.lops.WeightedUnaryMMR;
-import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -53,13 +54,9 @@ import 
org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -81,7 +78,6 @@ import 
org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public class SPInstructionParser extends InstructionParser 
@@ -392,24 +388,18 @@ public class SPInstructionParser extends InstructionParser
                        case MatrixReshape:
                                return 
MatrixReshapeSPInstruction.parseInstruction(str);
                                
-                       case MAppend:
-                               if(UtilFunctions.getDataType(str, 1) == 
DataType.MATRIX)
-                                       return 
MatrixAppendMSPInstruction.parseInstruction(str);
-                               else
-                                       return 
FrameAppendMSPInstruction.parseInstruction(str);
+                       case MAppend: //matrix/frame
+                               return 
AppendMSPInstruction.parseInstruction(str);
+                               
+                       case RAppend: //matrix/frame
+                               return 
AppendRSPInstruction.parseInstruction(str);
                        
-                       case GAppend:
+                       case GAppend: 
                                return 
AppendGSPInstruction.parseInstruction(str);
                        
                        case GAlignedAppend:
                                return 
AppendGAlignedSPInstruction.parseInstruction(str);
                                
-                       case RAppend:
-                               if(UtilFunctions.getDataType(str, 1) == 
DataType.MATRIX)
-                                       return 
MatrixAppendRSPInstruction.parseInstruction(str);
-                               else
-                                       return 
FrameAppendRSPInstruction.parseInstruction(str);
-                               
                        case Rand:
                                return RandSPInstruction.parseInstruction(str);
                                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index eaf23d5..ab70af2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -20,8 +20,12 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public abstract class AppendMSPInstruction extends BinarySPInstruction
 {
@@ -36,4 +40,30 @@ public abstract class AppendMSPInstruction extends 
BinarySPInstruction
                _cbind = cbind;
        }
 
+       public static AppendMSPInstruction parseInstruction( String str ) 
+               throws DMLRuntimeException 
+       {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 5);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand offset = new CPOperand(parts[3]);
+               CPOperand out = new CPOperand(parts[4]);
+               boolean cbind = Boolean.parseBoolean(parts[5]);
+               
+               if(!opcode.equalsIgnoreCase("mappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a AppendMSPInstruction: " + str);
+               
+               //construct matrix/frame appendm instruction
+               if( in1.getDataType().isMatrix() ) {
+                       return new MatrixAppendMSPInstruction(new 
ReorgOperator(OffsetColumnIndex
+                                       .getOffsetColumnIndexFnObject(-1)), 
in1, in2, offset, out, cbind, opcode, str);
+               }
+               else { //frame                  
+                       return new FrameAppendMSPInstruction(new 
ReorgOperator(OffsetColumnIndex
+                                       .getOffsetColumnIndexFnObject(-1)), 
in1, in2, offset, out, cbind, opcode, str);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 6d3cf5e..b56b7d7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,8 +19,12 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 
 public abstract class AppendRSPInstruction extends BinarySPInstruction
@@ -33,5 +37,31 @@ public abstract class AppendRSPInstruction extends 
BinarySPInstruction
                _sptype = SPINSTRUCTION_TYPE.RAppend;
                _cbind = cbind;
        }
+
+       public static AppendRSPInstruction parseInstruction ( String str ) 
+               throws DMLRuntimeException 
+       {       
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 4);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand out = new CPOperand(parts[3]);
+               boolean cbind = Boolean.parseBoolean(parts[4]);
+               
+               if(!opcode.equalsIgnoreCase("rappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixAppendRSPInstruction: " + str);
+               
+               if( in1.getDataType().isMatrix() ) {
+                       return new MatrixAppendRSPInstruction(new 
ReorgOperator(OffsetColumnIndex
+                                       .getOffsetColumnIndexFnObject(-1)), 
in1, in2, out, cbind, opcode, str);
+               }
+               else { //frame
+
+                       return new FrameAppendRSPInstruction(new 
ReorgOperator(OffsetColumnIndex
+                                       .getOffsetColumnIndexFnObject(-1)), 
in1, in2, out, cbind, opcode, str);
+               }
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
index b67f364..7aad0bf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -30,14 +30,11 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -46,27 +43,6 @@ public class FrameAppendMSPInstruction extends 
AppendMSPInstruction
                super(op, in1, in2, offset, out, cbind, opcode, istr);
        }
        
-       public static FrameAppendMSPInstruction parseInstruction ( String str ) 
-               throws DMLRuntimeException 
-       {
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 5);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand offset = new CPOperand(parts[3]);
-               CPOperand out = new CPOperand(parts[4]);
-               boolean cbind = Boolean.parseBoolean(parts[5]);
-               
-               if(!opcode.equalsIgnoreCase("mappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a FrameAppendMSPInstruction: " + str);
-               
-               return new FrameAppendMSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, offset, out, cbind, opcode, str);
-       }
-       
        @Override
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
index c627e40..067769d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -29,13 +29,10 @@ import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class FrameAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -43,26 +40,6 @@ public class FrameAppendRSPInstruction extends 
AppendRSPInstruction
        {
                super(op, in1, in2, out, cbind, opcode, istr);
        }
-       
-       public static FrameAppendRSPInstruction parseInstruction ( String str ) 
-                       throws DMLRuntimeException 
-       {       
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 4);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand out = new CPOperand(parts[3]);
-               boolean cbind = Boolean.parseBoolean(parts[4]);
-               
-               if(!opcode.equalsIgnoreCase("rappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a FrameAppendRSPInstruction: " + str);
-               
-               return new FrameAppendRSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, out, cbind, opcode, str);
-       }
                
        @Override
        public void processInstruction(ExecutionContext ec)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
index 9e557bb..c4cd548 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -30,8 +30,6 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
@@ -42,7 +40,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendMSPInstruction extends AppendMSPInstruction
 {
@@ -51,27 +48,6 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction
                super(op, in1, in2, offset, out, cbind, opcode, istr);
        }
        
-       public static MatrixAppendMSPInstruction parseInstruction ( String str 
) 
-               throws DMLRuntimeException 
-       {
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 5);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand offset = new CPOperand(parts[3]);
-               CPOperand out = new CPOperand(parts[4]);
-               boolean cbind = Boolean.parseBoolean(parts[5]);
-               
-               if(!opcode.equalsIgnoreCase("mappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixAppendMSPInstruction: " + str);
-               
-               return new MatrixAppendMSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, offset, out, cbind, opcode, str);
-       }
-       
        @Override
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
index 644fcd2..779e5ab 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -27,13 +27,10 @@ import scala.Tuple2;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
 public class MatrixAppendRSPInstruction extends AppendRSPInstruction
 {
@@ -42,26 +39,6 @@ public class MatrixAppendRSPInstruction extends 
AppendRSPInstruction
                super(op, in1, in2, out, cbind, opcode, istr);
        }
        
-       public static MatrixAppendRSPInstruction parseInstruction ( String str 
) 
-               throws DMLRuntimeException 
-       {       
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 4);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand out = new CPOperand(parts[3]);
-               boolean cbind = Boolean.parseBoolean(parts[4]);
-               
-               if(!opcode.equalsIgnoreCase("rappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixAppendRSPInstruction: " + str);
-               
-               return new MatrixAppendRSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, out, cbind, opcode, str);
-       }
-       
        @Override
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8d5f3cea/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3ac1daf..351d559 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -160,7 +160,8 @@ public class FrameRDDConverterUtils
         * @param strict
         * @return
         */
-       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, 
CSVFileFormatProperties props, boolean strict)
+       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, 
+                       MatrixCharacteristics mcIn, CSVFileFormatProperties 
props, boolean strict)
        {
                JavaPairRDD<Long,FrameBlock> input = in;
                
@@ -193,16 +194,12 @@ public class FrameRDDConverterUtils
                        JavaPairRDD<LongWritable, Text> in, 
MatrixCharacteristics mcOut, List<ValueType> schema ) 
                throws DMLRuntimeException  
        {
-               //replicate schema entry if necessary
-               List<ValueType> lschema = (schema.size()==1 && 
mcOut.getCols()>1) ?
-                               Collections.nCopies((int)mcOut.getCols(), 
schema.get(0)) : schema;
-               
                //convert input rdd to serializable long/frame block
                JavaPairRDD<Long,Text> input = 
                                in.mapToPair(new 
LongWritableTextToLongTextFunction());
                
                //do actual conversion
-               return textCellToBinaryBlockLongIndex(sc, input, mcOut, 
lschema);
+               return textCellToBinaryBlockLongIndex(sc, input, mcOut, schema);
        }
 
        /**
@@ -215,12 +212,18 @@ public class FrameRDDConverterUtils
         * @throws DMLRuntimeException
         */
        public static JavaPairRDD<Long, FrameBlock> 
textCellToBinaryBlockLongIndex(JavaSparkContext sc,
-                       JavaPairRDD<Long, Text> input, MatrixCharacteristics 
mcOut, List<ValueType> schema ) 
+                       JavaPairRDD<Long, Text> input, MatrixCharacteristics 
mc, List<ValueType> schema ) 
                throws DMLRuntimeException  
        {
+               //prepare default schema if needed
+               if( schema == null || schema.size()==1 ) {
+                       schema = Collections.nCopies((int)mc.getCols(), 
+                               (schema!=null) ? schema.get(0) : 
ValueType.STRING);
+               }
                
                //convert textcell rdd to binary block rdd (w/ partial blocks)
-               JavaPairRDD<Long, FrameBlock> output = 
input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema 
));
+               JavaPairRDD<Long, FrameBlock> output = input.values()
+                               .mapPartitionsToPair(new 
TextToBinaryBlockFunction( mc, schema ));
                
                //aggregate partial matrix blocks
                JavaPairRDD<Long,FrameBlock> out = 
@@ -259,14 +262,9 @@ public class FrameRDDConverterUtils
                        JavaPairRDD<MatrixIndexes, MatrixBlock> input, 
MatrixCharacteristics mcIn)
                throws DMLRuntimeException 
        {
-               //Do actual conversion
-               JavaPairRDD<Long, FrameBlock> output = 
matrixBlockToBinaryBlockLongIndex(sc,input, mcIn);
-               
-               //convert input rdd to serializable LongWritable/frame block
-               JavaPairRDD<LongWritable,FrameBlock> out = 
-                               output.mapToPair(new 
LongFrameToLongWritableFrameFunction());
-               
-               return out;
+               //convert and map to serializable LongWritable/frame block
+               return matrixBlockToBinaryBlockLongIndex(sc,input, mcIn)
+                       .mapToPair(new LongFrameToLongWritableFrameFunction());
        }
        
 
@@ -285,16 +283,17 @@ public class FrameRDDConverterUtils
                JavaPairRDD<Long, FrameBlock> out = null;
                
                if(mcIn.getCols() > mcIn.getColsPerBlock()) {
-                       
+                       //convert matrix binary block to frame binary block
                        out = input.flatMapToPair(new 
MatrixToBinaryBlockFunction(mcIn));
                        
                        //aggregate partial frame blocks
-                       if(mcIn.getCols() > mcIn.getColsPerBlock())
-                               out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( out );
+                       out = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey( out );
                }
-               else
+               else {
+                       //convert single matrix binary block to frame binary 
block (w/o shuffle)
                        out = input.mapToPair(new 
MatrixToBinaryBlockOneColumnBlockFunction(mcIn));
-               
+               }
+                       
                return out;
        }
        
@@ -725,10 +724,8 @@ public class FrameRDDConverterUtils
                private void flushBlocksToList( Long ix, FrameBlock fb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
                        throws DMLRuntimeException
                {                       
-                       if( fb != null && fb.getNumRows()>0 ) {
-                               fb.setSchema(_schema); //use shared schema
+                       if( fb != null && fb.getNumRows()>0 )
                                ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
-                       }
                }
        }
        

Reply via email to