Repository: incubator-systemml
Updated Branches:
  refs/heads/master 09477a7b0 -> c22f239e3


[SYSTEMML-562] Frame Append operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c22f239e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c22f239e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c22f239e

Branch: refs/heads/master
Commit: c22f239e3fd3b526190812919960684bfcf1a715
Parents: 09477a7
Author: Arvind Surve <[email protected]>
Authored: Mon Jul 18 21:10:08 2016 -0700
Committer: Arvind Surve <[email protected]>
Committed: Mon Jul 18 21:10:09 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/BinaryOp.java    |   7 +-
 .../controlprogram/caching/CacheableData.java   |  39 +++
 .../controlprogram/caching/MatrixObject.java    |  44 ---
 .../instructions/SPInstructionParser.java       |  18 +-
 .../instructions/cp/VariableCPInstruction.java  |  18 +-
 .../spark/AppendMSPInstruction.java             | 257 +----------------
 .../spark/AppendRSPInstruction.java             |  85 +-----
 .../spark/FrameAppendMSPInstruction.java        | 157 ++++++++++
 .../spark/FrameAppendRSPInstruction.java        | 170 +++++++++++
 .../spark/MatrixAppendMSPInstruction.java       | 284 +++++++++++++++++++
 .../spark/MatrixAppendRSPInstruction.java       | 112 ++++++++
 .../sysml/runtime/matrix/data/FrameBlock.java   |   2 +
 .../sysml/runtime/util/UtilFunctions.java       |  19 ++
 .../functions/frame/FrameAppendDistTest.java    | 226 +++++++++++++++
 src/test/scripts/functions/frame/FrameAppend.R  |  33 +++
 .../scripts/functions/frame/FrameAppend.dml     |  29 ++
 16 files changed, 1108 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java 
b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index 94de0e7..65e9232 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1125,7 +1125,7 @@ public class BinaryOp extends Hop
                
                Lop offset = createOffsetLop( left, cbind ); //offset 1st input
                AppendMethod am = optFindAppendSPMethod(left.getDim1(), 
left.getDim2(), right.getDim1(), right.getDim2(), 
-                               right.getRowsInBlock(), right.getColsInBlock(), 
right.getNnz(), cbind);
+                               right.getRowsInBlock(), right.getColsInBlock(), 
right.getNnz(), cbind, dt);
        
                switch( am )
                {
@@ -1281,16 +1281,17 @@ public class BinaryOp extends Hop
                return AppendMethod.MR_GAPPEND;         
        }
        
-       private static AppendMethod optFindAppendSPMethod( long m1_dim1, long 
m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, 
boolean cbind )
+       private static AppendMethod optFindAppendSPMethod( long m1_dim1, long 
m1_dim2, long m2_dim1, long m2_dim2, long m1_rpb, long m1_cpb, long m2_nnz, 
boolean cbind, DataType dt )
        {
                if(FORCED_APPEND_METHOD != null) {
                        return FORCED_APPEND_METHOD;
                }
                
                //check for best case (map-only w/o shuffle)            
-               if(    m2_dim1 >= 1 && m2_dim2 >= 1   //rhs dims known          
                
+               if((    m2_dim1 >= 1 && m2_dim2 >= 1   //rhs dims known         
                        
                        && (cbind && m2_dim2 <= m1_cpb    //rhs is smaller than 
column block 
                        || !cbind && m2_dim1 <= m1_rpb) ) //rhs is smaller than 
row block
+                       && ((dt == DataType.MATRIX) || (dt == DataType.FRAME && 
cbind)))
                {
                        if( 
OptimizerUtils.checkSparkBroadcastMemoryBudget(m2_dim1, m2_dim2, m1_rpb, 
m1_cpb, m2_nnz) ) {
                                return AppendMethod.MR_MAPPEND;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index d60c607..2b45ddd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -1422,4 +1422,43 @@ public abstract class CacheableData<T extends 
CacheBlock> extends Data
        public static synchronized void enableCaching() {
                _activeFlag = true;
        }
+
+       /**
+        * 
+        * @param fName
+        * @param outputFormat
+        * @return
+        * @throws CacheException
+        */
+       public synchronized boolean moveData(String fName, String outputFormat) 
+               throws CacheException 
+       {       
+               boolean ret = false;
+               
+               try
+               {
+                       //export or rename to target file on hdfs
+                       if( (isDirty() || (!isEqualOutputFormat(outputFormat) 
&& isEmpty(true))) ||
+                               (getRDDHandle() != null && 
!MapReduceTool.existsFileOnHDFS(_hdfsFileName)))
+                       {
+                               exportData(fName, outputFormat);
+                               ret = true;
+                       }
+                       else if( isEqualOutputFormat(outputFormat) )
+                       {
+                               MapReduceTool.deleteFileIfExistOnHDFS(fName);
+                               
MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
+                               MapReduceTool.renameFileOnHDFS( _hdfsFileName, 
fName );
+                               writeMetaData( fName, outputFormat, null );
+                               ret = true;
+                       }                               
+               }
+               catch (Exception e)
+               {
+                       throw new CacheException ("Move to " + fName + " 
failed.", e);
+               }
+               
+               return ret;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 5d5f41f..4148545 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -282,50 +282,6 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                
                return str.toString();
        }
-
-       /**
-        * 
-        * @param fName
-        * @param outputFormat
-        * @return
-        * @throws CacheException
-        */
-       public synchronized boolean moveData(String fName, String outputFormat) 
-               throws CacheException 
-       {       
-               boolean ret = false;
-               
-               try
-               {
-                       //ensure input file is persistent on hdfs (pending RDD 
operations), 
-                       //file might have been written during export or collect 
via write/read
-                       if( getRDDHandle() != null && 
!MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { 
-                               writeBlobFromRDDtoHDFS(getRDDHandle(), 
_hdfsFileName, outputFormat);
-                       }
-                       
-                       //export or rename to target file on hdfs
-                       if( isDirty() || (!isEqualOutputFormat(outputFormat) && 
isEmpty(true))) 
-                       {
-                               exportData(fName, outputFormat);
-                               ret = true;
-                       }
-                       else if( isEqualOutputFormat(outputFormat) )
-                       {
-                               MapReduceTool.deleteFileIfExistOnHDFS(fName);
-                               
MapReduceTool.deleteFileIfExistOnHDFS(fName+".mtd");
-                               MapReduceTool.renameFileOnHDFS( _hdfsFileName, 
fName );
-                               writeMetaData( fName, outputFormat, null );
-                               ret = true;
-                       }                               
-               }
-               catch (Exception e)
-               {
-                       throw new CacheException ("Move to " + fName + " 
failed.", e);
-               }
-               
-               return ret;
-       }
-       
        
        // *********************************************
        // ***                                       ***

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index e8437fb..c74b44e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -34,13 +34,12 @@ import org.apache.sysml.lops.WeightedSquaredLoss;
 import org.apache.sysml.lops.WeightedSquaredLossR;
 import org.apache.sysml.lops.WeightedUnaryMM;
 import org.apache.sysml.lops.WeightedUnaryMMR;
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.instructions.spark.AggregateTernarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AggregateUnarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGAlignedSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendMSPInstruction;
-import org.apache.sysml.runtime.instructions.spark.AppendRSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.ArithmeticBinarySPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BinUaggChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.BuiltinBinarySPInstruction;
@@ -54,9 +53,13 @@ import 
org.apache.sysml.runtime.instructions.spark.CovarianceSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.CpmmSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeAggregateSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.CumulativeOffsetSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.FrameAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.IndexingSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendMSPInstruction;
+import org.apache.sysml.runtime.instructions.spark.MatrixAppendRSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.MatrixReshapeSPInstruction;
 import 
org.apache.sysml.runtime.instructions.spark.MultiReturnParameterizedBuiltinSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.PMapmmSPInstruction;
@@ -78,6 +81,7 @@ import 
org.apache.sysml.runtime.instructions.spark.QuantileSortSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.UaggOuterChainSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.WriteSPInstruction;
 import org.apache.sysml.runtime.instructions.spark.ZipmmSPInstruction;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 
 public class SPInstructionParser extends InstructionParser 
@@ -389,7 +393,10 @@ public class SPInstructionParser extends InstructionParser
                                return 
MatrixReshapeSPInstruction.parseInstruction(str);
                                
                        case MAppend:
-                               return 
AppendMSPInstruction.parseInstruction(str);
+                               if(UtilFunctions.getDataType(str, 1) == 
DataType.MATRIX)
+                                       return 
MatrixAppendMSPInstruction.parseInstruction(str);
+                               else
+                                       return 
FrameAppendMSPInstruction.parseInstruction(str);
                        
                        case GAppend:
                                return 
AppendGSPInstruction.parseInstruction(str);
@@ -398,7 +405,10 @@ public class SPInstructionParser extends InstructionParser
                                return 
AppendGAlignedSPInstruction.parseInstruction(str);
                                
                        case RAppend:
-                               return 
AppendRSPInstruction.parseInstruction(str);
+                               if(UtilFunctions.getDataType(str, 1) == 
DataType.MATRIX)
+                                       return 
MatrixAppendRSPInstruction.parseInstruction(str);
+                               else
+                                       return 
FrameAppendRSPInstruction.parseInstruction(str);
                                
                        case Rand:
                                return RandSPInstruction.parseInstruction(str);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 1fae8fc..ee86b59 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.lops.UnaryCP;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -655,6 +656,7 @@ public class VariableCPInstruction extends CPInstruction
         * @param ec
         * @throws DMLRuntimeException
         */
+       @SuppressWarnings("rawtypes")
        private void processMoveInstruction(ExecutionContext ec) throws 
DMLRuntimeException {
                
                if ( input3 == null ) {
@@ -683,16 +685,22 @@ public class VariableCPInstruction extends CPInstruction
                        if ( ec.getVariable(input1.getName()) == null ) 
                                throw new DMLRuntimeException("Unexpected 
error: could not find a data object for variable name:" + input1.getName() + ", 
while processing instruction " +this.toString());
                        
-                       MatrixObject mo = (MatrixObject) 
ec.getVariable(input1.getName());
+                       Object object = ec.getVariable(input1.getName());
+                       
                        if ( input3.getName().equalsIgnoreCase("binaryblock") ) 
{
-                               boolean success = mo.moveData(input2.getName(), 
input3.getName());
+                               boolean success = false;
+                               success = 
((CacheableData)object).moveData(input2.getName(), input3.getName());
                                if (!success) {
                                        throw new DMLRuntimeException("Failed 
to move var " + input1.getName() + " to file " + input2.getName() + ".");
                                }
                        }
-                       else 
-                               throw new DMLRuntimeException("Unexpected 
formats while copying: from blocks [" 
-                                                       + 
mo.getNumRowsPerBlock() + "," + mo.getNumColumnsPerBlock() + "] to " + 
input3.getName());
+                       else
+                               if(object instanceof MatrixObject)
+                                       throw new 
DMLRuntimeException("Unexpected formats while copying: from matrix blocks [" 
+                                                       + 
((MatrixObject)object).getNumRowsPerBlock() + "," + 
((MatrixObject)object).getNumColumnsPerBlock() + "] to " + input3.getName());
+                               else if (object instanceof FrameObject)
+                                       throw new 
DMLRuntimeException("Unexpected formats while copying: from fram object [" 
+                                                       + 
((FrameObject)object).getNumColumns() + "," + 
((FrameObject)object).getNumColumns() + "] to " + input3.getName());
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index 26be3eb..eaf23d5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -19,35 +19,14 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
-import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
-import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
-public class AppendMSPInstruction extends BinarySPInstruction
+public abstract class AppendMSPInstruction extends BinarySPInstruction
 {
-       private CPOperand _offset = null;
-       private boolean _cbind = true;
+       protected CPOperand _offset = null;
+       protected boolean _cbind = true;
        
        public AppendMSPInstruction(Operator op, CPOperand in1, CPOperand in2, 
CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
        {
@@ -56,235 +35,5 @@ public class AppendMSPInstruction extends 
BinarySPInstruction
                _offset = offset;
                _cbind = cbind;
        }
-       
-       public static AppendMSPInstruction parseInstruction ( String str ) 
-               throws DMLRuntimeException 
-       {
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 5);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand offset = new CPOperand(parts[3]);
-               CPOperand out = new CPOperand(parts[4]);
-               boolean cbind = Boolean.parseBoolean(parts[5]);
-               
-               if(!opcode.equalsIgnoreCase("mappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a AppendMSPInstruction: " + str);
-               
-               return new AppendMSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, offset, out, cbind, opcode, str);
-       }
-       
-       @Override
-       public void processInstruction(ExecutionContext ec)
-               throws DMLRuntimeException 
-       {
-               // map-only append (rhs must be vector and fit in mapper mem)
-               SparkExecutionContext sec = (SparkExecutionContext)ec;
-               checkBinaryAppendInputCharacteristics(sec, _cbind, false, 
false);
-               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
-               MatrixCharacteristics mc2 = 
sec.getMatrixCharacteristics(input2.getName());
-               int brlen = mc1.getRowsPerBlock();
-               int bclen = mc1.getColsPerBlock();
-               
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( input2.getName() );
-               long off = sec.getScalarInput( _offset.getName(), 
_offset.getValueType(), _offset.isLiteral()).getLongValue();
-               
-               //execute map-append operations (partitioning preserving if 
#in-blocks = #out-blocks)
-               JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-               if( preservesPartitioning(mc1, mc2, _cbind) ) {
-                       out = in1.mapPartitionsToPair(
-                                       new MapSideAppendPartitionFunction(in2, 
_cbind, off, brlen, bclen), true);
-               }
-               else {
-                       out = in1.flatMapToPair(
-                                       new MapSideAppendFunction(in2, _cbind, 
off, brlen, bclen));
-               }
-               
-               //put output RDD handle into symbol table
-               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
-               sec.setRDDHandleForVariable(output.getName(), out);
-               sec.addLineageRDD(output.getName(), input1.getName());
-               sec.addLineageBroadcast(output.getName(), input2.getName());
-       }
-       
-       /**
-        * 
-        * @param mcIn1
-        * @param mcIn2
-        * @return
-        */
-       private boolean preservesPartitioning( MatrixCharacteristics mcIn1, 
MatrixCharacteristics mcIn2, boolean cbind )
-       {
-               long ncblksIn1 = cbind ?
-                               
(long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : 
-                               
(long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
-               long ncblksOut = cbind ? 
-                               
(long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock())
 : 
-                               
(long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
-               
-               //mappend is partitioning-preserving if in-block append (e.g., 
common case of colvector append)
-               return (ncblksIn1 == ncblksOut);
-       }
-       
-       /**
-        * 
-        */
-       private static class MapSideAppendFunction implements  
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
-       {
-               private static final long serialVersionUID = 
2738541014432173450L;
-               
-               private PartitionedBroadcast<MatrixBlock> _pm = null;
-               private boolean _cbind = true;
-               private long _offset; 
-               private int _brlen; 
-               private int _bclen;
-               private long _lastBlockColIndex;
-               
-               public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> 
binput, boolean cbind, long offset, int brlen, int bclen)  
-               {
-                       _pm = binput;
-                       _cbind = cbind;
-                       
-                       _offset = offset;
-                       _brlen = brlen;
-                       _bclen = bclen;
-                       
-                       //check for boundary block
-                       int blen = cbind ? bclen : brlen;
-                       _lastBlockColIndex = 
(long)Math.ceil((double)_offset/blen);                     
-               }
-               
-               @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
-                       throws Exception 
-               {
-                       ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
-                       
-                       IndexedMatrixValue in1 = 
SparkUtils.toIndexedMatrixBlock(kv);
-                       MatrixIndexes ix = in1.getIndexes();
-                       
-                       //case 1: pass through of non-boundary blocks
-                       if( 
(_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) 
-                       {
-                               ret.add( kv );
-                       }
-                       //case 2: pass through full input block and rhs block 
-                       else if( _cbind && in1.getValue().getNumColumns() == 
_bclen 
-                                       || !_cbind && 
in1.getValue().getNumRows() == _brlen) 
-                       {                               
-                               //output lhs block
-                               ret.add( kv );
-                               
-                               //output shallow copy of rhs block
-                               if( _cbind ) {
-                                       ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
-                                                       new 
MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
-                                                       
_pm.getBlock((int)ix.getRowIndex(), 1)) );
-                               }
-                               else { //rbind
-                                       ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
-                                                       new 
MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
-                                                       _pm.getBlock(1, 
(int)ix.getColumnIndex())) );   
-                               }
-                       }
-                       //case 3: append operation on boundary block
-                       else 
-                       {
-                               //allocate space for the output value
-                               ArrayList<IndexedMatrixValue> outlist=new 
ArrayList<IndexedMatrixValue>(2);
-                               IndexedMatrixValue first = new 
IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
-                               outlist.add(first);
-                               
-                               MatrixBlock value_in2 = null;
-                               if( _cbind ) {
-                                       value_in2 = 
_pm.getBlock((int)ix.getRowIndex(), 1);
-                                       
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
-                                               IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
-                                               
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
-                                               outlist.add(second);
-                                       }
-                               }
-                               else { //rbind
-                                       value_in2 = _pm.getBlock(1, 
(int)ix.getColumnIndex());
-                                       
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
-                                               IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
-                                               
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
-                                               outlist.add(second);
-                                       }
-                               }
-       
-                               
OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, 
_brlen, _bclen, _cbind, true, 0);    
-                               
ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
-                       }
-                       
-                       return ret;
-               }
-       }
-       
-       /**
-        * 
-        */
-       private static class MapSideAppendPartitionFunction implements  
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, 
MatrixBlock> 
-       {
-               private static final long serialVersionUID = 
5767240739761027220L;
-
-               private PartitionedBroadcast<MatrixBlock> _pm = null;
-               private boolean _cbind = true;
-               private long _lastBlockColIndex = -1;
-               
-               public 
MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, 
boolean cbind, long offset, int brlen, int bclen)  
-               {
-                       _pm = binput;
-                       _cbind = cbind;
-                       
-                       //check for boundary block
-                       int blen = cbind ? bclen : brlen;
-                       _lastBlockColIndex = 
(long)Math.ceil((double)offset/blen);                      
-               }
 
-               @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
-                       throws Exception 
-               {
-                       return new MapAppendPartitionIterator(arg0);
-               }
-               
-               /**
-                * Lazy mappend iterator to prevent materialization of entire 
partition output in-memory.
-                * The implementation via mapPartitions is required to preserve 
partitioning information,
-                * which is important for performance. 
-                */
-               private class MapAppendPartitionIterator extends 
LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
-               {
-                       public 
MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
-                               super(in);
-                       }
-
-                       @Override
-                       protected Tuple2<MatrixIndexes, MatrixBlock> 
computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
-                               throws Exception
-                       {
-                               MatrixIndexes ix = arg._1();
-                               MatrixBlock in1 = arg._2();
-                               
-                               //case 1: pass through of non-boundary blocks
-                               if( 
(_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
-                                       return arg;
-                               }
-                               //case 3: append operation on boundary block
-                               else {
-                                       int rowix = _cbind ? 
(int)ix.getRowIndex() : 1;
-                                       int colix = _cbind ? 1 : 
(int)ix.getColumnIndex();                                      
-                                       MatrixBlock in2 = _pm.getBlock(rowix, 
colix);
-                                       MatrixBlock out = 
in1.appendOperations(in2, new MatrixBlock(), _cbind);
-                                       return new 
Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
-                               }       
-                       }                       
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 93fc7aa..6d3cf5e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -19,25 +19,13 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-
-import scala.Tuple2;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
 
-public class AppendRSPInstruction extends BinarySPInstruction
+
+public abstract class AppendRSPInstruction extends BinarySPInstruction
 {
-       private boolean _cbind = true;
+       protected boolean _cbind = true;
        
        public AppendRSPInstruction(Operator op, CPOperand in1, CPOperand in2, 
CPOperand out, boolean cbind, String opcode, String istr)
        {
@@ -45,72 +33,5 @@ public class AppendRSPInstruction extends BinarySPInstruction
                _sptype = SPINSTRUCTION_TYPE.RAppend;
                _cbind = cbind;
        }
-       
-       public static AppendRSPInstruction parseInstruction ( String str ) 
-               throws DMLRuntimeException 
-       {       
-               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
-               InstructionUtils.checkNumFields (parts, 4);
-               
-               String opcode = parts[0];
-               CPOperand in1 = new CPOperand(parts[1]);
-               CPOperand in2 = new CPOperand(parts[2]);
-               CPOperand out = new CPOperand(parts[3]);
-               boolean cbind = Boolean.parseBoolean(parts[4]);
-               
-               if(!opcode.equalsIgnoreCase("rappend"))
-                       throw new DMLRuntimeException("Unknown opcode while 
parsing a AppendRSPInstruction: " + str);
-               
-               return new AppendRSPInstruction(
-                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
-                               in1, in2, out, cbind, opcode, str);
-       }
-       
-       @Override
-       public void processInstruction(ExecutionContext ec)
-               throws DMLRuntimeException 
-       {
-               // reduce-only append (output must have at most one column 
block)
-               SparkExecutionContext sec = (SparkExecutionContext)ec;
-               checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
-               
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-               
-               //execute reduce-append operations (partitioning preserving)
-               JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
-                               .join(in2)
-                               .mapValues(new 
ReduceSideAppendFunction(_cbind));
-
-               //put output RDD handle into symbol table
-               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
-               sec.setRDDHandleForVariable(output.getName(), out);
-               sec.addLineageRDD(output.getName(), input1.getName());
-               sec.addLineageRDD(output.getName(), input2.getName());          
-       }
-       
-       /**
-        * 
-        */
-       private static class ReduceSideAppendFunction implements 
Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> 
-       {
-               private static final long serialVersionUID = 
-6763904972560309095L;
-
-               private boolean _cbind = true;
-                               
-               public ReduceSideAppendFunction(boolean cbind) {
-                       _cbind = cbind;
-               }
-               
-               @Override
-               public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
-                       throws Exception 
-               {
-                       MatrixBlock left = arg0._1();
-                       MatrixBlock right = arg0._2();
-                       
-                       return left.appendOperations(right, new MatrixBlock(), 
_cbind);
-               }
-       }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
new file mode 100644
index 0000000..b67f364
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendMSPInstruction.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendMSPInstruction extends AppendMSPInstruction
+{
+       public FrameAppendMSPInstruction(Operator op, CPOperand in1, CPOperand 
in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+       {
+               super(op, in1, in2, offset, out, cbind, opcode, istr);
+       }
+       
+       public static FrameAppendMSPInstruction parseInstruction ( String str ) 
+               throws DMLRuntimeException 
+       {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 5);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand offset = new CPOperand(parts[3]);
+               CPOperand out = new CPOperand(parts[4]);
+               boolean cbind = Boolean.parseBoolean(parts[5]);
+               
+               if(!opcode.equalsIgnoreCase("mappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a FrameAppendMSPInstruction: " + str);
+               
+               return new FrameAppendMSPInstruction(
+                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+                               in1, in2, offset, out, cbind, opcode, str);
+       }
+       
+       @Override
+       public void processInstruction(ExecutionContext ec)
+               throws DMLRuntimeException 
+       {
+               // map-only append (rhs must be vector and fit in mapper mem)
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               checkBinaryAppendInputCharacteristics(sec, _cbind, false, 
false);
+               
+               JavaPairRDD<Long,FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+               PartitionedBroadcast<FrameBlock> in2 = 
sec.getBroadcastForFrameVariable( input2.getName() );
+               
+               //execute map-append operations (partitioning preserving if 
keys for blocks not changing)
+               JavaPairRDD<Long,FrameBlock> out = null;
+               if( preservesPartitioning(_cbind) ) {
+                       out = in1.mapPartitionsToPair(
+                                       new 
MapSideAppendPartitionFunction(in2), true);
+               }
+               else 
+                       throw new DMLRuntimeException("Append type rbind not 
supported for frame mappend, instead use rappend");
+               
+               //put output RDD handle into symbol table
+               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+               sec.addLineageBroadcast(output.getName(), input2.getName());
+       }
+       
+       /** 
+        * 
+        * @param cbind
+        * @return
+        */
+       private boolean preservesPartitioning( boolean cbind )
+       {
+               //Partitions for input1 will be preserved in case of cbind, 
+               // where as in case of rbind partitions will not be preserved.
+               return cbind;
+       }
+       
+       /**
+        * 
+        */
+       private static class MapSideAppendPartitionFunction implements  
PairFlatMapFunction<Iterator<Tuple2<Long,FrameBlock>>, Long, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-3997051891171313830L;
+
+               private PartitionedBroadcast<FrameBlock> _pm = null;
+               
+               public 
MapSideAppendPartitionFunction(PartitionedBroadcast<FrameBlock> binput)  
+               {
+                       _pm = binput;
+               }
+
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Long, FrameBlock>> arg0)
+                       throws Exception 
+               {
+                       return new MapAppendPartitionIterator(arg0);
+               }
+               
+               /**
+                * Lazy mappend iterator to prevent materialization of entire 
partition output in-memory.
+                * The implementation via mapPartitions is required to preserve 
partitioning information,
+                * which is important for performance. 
+                */
+               private class MapAppendPartitionIterator extends 
LazyIterableIterator<Tuple2<Long, FrameBlock>>
+               {
+                       public MapAppendPartitionIterator(Iterator<Tuple2<Long, 
FrameBlock>> in) {
+                               super(in);
+                       }
+
+                       @Override
+                       protected Tuple2<Long, FrameBlock> 
computeNext(Tuple2<Long, FrameBlock> arg)
+                               throws Exception
+                       {
+                               Long ix = arg._1();
+                               FrameBlock in1 = arg._2();
+                       
+                               int rowix = 
(ix.intValue()-1)/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+                               int colix = 1;
+                               
+                               FrameBlock in2 = _pm.getBlock(rowix, colix);
+                               FrameBlock out = in1.appendOperations(in2, new 
FrameBlock(), true); //cbind
+                               return new Tuple2<Long,FrameBlock>(ix, out);
+                       }                       
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
new file mode 100644
index 0000000..c627e40
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameAppendRSPInstruction.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class FrameAppendRSPInstruction extends AppendRSPInstruction
+{
+       public FrameAppendRSPInstruction(Operator op, CPOperand in1, CPOperand 
in2, CPOperand out, boolean cbind, String opcode, String istr)
+       {
+               super(op, in1, in2, out, cbind, opcode, istr);
+       }
+       
+       public static FrameAppendRSPInstruction parseInstruction ( String str ) 
+                       throws DMLRuntimeException 
+       {       
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 4);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand out = new CPOperand(parts[3]);
+               boolean cbind = Boolean.parseBoolean(parts[4]);
+               
+               if(!opcode.equalsIgnoreCase("rappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a FrameAppendRSPInstruction: " + str);
+               
+               return new FrameAppendRSPInstruction(
+                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+                               in1, in2, out, cbind, opcode, str);
+       }
+               
+       @Override
+       public void processInstruction(ExecutionContext ec)
+               throws DMLRuntimeException 
+       {
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               JavaPairRDD<Long,FrameBlock> in1 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input1.getName() );
+               JavaPairRDD<Long,FrameBlock> in2 = 
sec.getFrameBinaryBlockRDDHandleForVariable( input2.getName() );
+               JavaPairRDD<Long,FrameBlock> out = null;
+               long leftRows = 
sec.getMatrixCharacteristics(input1.getName()).getRows();
+               
+               if(_cbind) {
+                       JavaPairRDD<Long,FrameBlock> in1Aligned = 
in1.mapToPair(new ReduceSideAppendAlignFunction(leftRows));                   
+                       in1Aligned = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in1Aligned);                     
+                       JavaPairRDD<Long,FrameBlock> in2Aligned = 
in2.mapToPair(new ReduceSideAppendAlignFunction(leftRows));
+                       in2Aligned = (JavaPairRDD<Long, FrameBlock>) 
RDDAggregateUtils.mergeByFrameKey(in2Aligned);                     
+                       
+                       out = in1Aligned.join(in2Aligned).mapValues(new 
ReduceSideColumnsFunction(_cbind));
+               } else {        //rbind
+                       JavaPairRDD<Long,FrameBlock> right = in2.mapToPair( new 
ReduceSideAppendRowsFunction(leftRows));
+                       out = in1.union(right);
+               }
+               
+               //put output RDD handle into symbol table
+               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+               sec.addLineageRDD(output.getName(), input2.getName());          
+       }
+       
+       /**
+        * 
+        */
+       private static class ReduceSideColumnsFunction implements 
Function<Tuple2<FrameBlock, FrameBlock>, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-97824903649667646L;
+
+               private boolean _cbind = true;
+                               
+               public ReduceSideColumnsFunction(boolean cbind) {
+                       _cbind = cbind;
+               }
+               
+               @Override
+               public FrameBlock call(Tuple2<FrameBlock, FrameBlock> arg0)
+                       throws Exception 
+               {
+                       FrameBlock left = arg0._1();
+                       FrameBlock right = arg0._2();
+                       
+                       return left.appendOperations(right, new FrameBlock(), 
_cbind);
+               }
+       }
+
+       /**
+        * 
+        */
+       private static class ReduceSideAppendRowsFunction implements 
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
1723795153048336791L;
+
+               private long _offset;
+                               
+               public ReduceSideAppendRowsFunction(long offset) {
+                       _offset = offset;
+               }
+               
+               @Override
+               public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> 
arg0)
+                       throws Exception 
+               {
+                       return new Tuple2<Long, FrameBlock>(arg0._1()+_offset, 
arg0._2());
+               }
+       }
+
+       /**
+        * 
+        */
+       private static class ReduceSideAppendAlignFunction implements 
PairFunction<Tuple2<Long, FrameBlock>, Long, FrameBlock> 
+       {
+               private static final long serialVersionUID = 
5850400295183766409L;
+
+               private long _rows;
+                               
+               public ReduceSideAppendAlignFunction(long rows) {
+                       _rows = rows;
+               }
+               
+               @Override
+               public Tuple2<Long,FrameBlock> call(Tuple2<Long, FrameBlock> 
arg0)
+                       throws Exception 
+               {
+                       FrameBlock resultBlock = new 
FrameBlock(arg0._2().getSchema());
+                                               
+                       long index = 
(arg0._1()/OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE)*OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE+1;
+                       int maxRows = (int) (_rows - index+1 >= 
OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE?OptimizerUtils.DEFAULT_FRAME_BLOCKSIZE:_rows
 - index+1);
+
+                       resultBlock.ensureAllocatedColumns(maxRows);
+                       resultBlock = 
resultBlock.leftIndexingOperations(arg0._2(), 0, maxRows-1, 0, 
arg0._2().getNumColumns()-1, new FrameBlock());
+                       
+                       return new Tuple2<Long, FrameBlock>(index, resultBlock);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
new file mode 100644
index 0000000..9e557bb
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
+import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendMSPInstruction extends AppendMSPInstruction
+{
+       public MatrixAppendMSPInstruction(Operator op, CPOperand in1, CPOperand 
in2, CPOperand offset, CPOperand out, boolean cbind, String opcode, String istr)
+       {
+               super(op, in1, in2, offset, out, cbind, opcode, istr);
+       }
+       
+       public static MatrixAppendMSPInstruction parseInstruction ( String str 
) 
+               throws DMLRuntimeException 
+       {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 5);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand offset = new CPOperand(parts[3]);
+               CPOperand out = new CPOperand(parts[4]);
+               boolean cbind = Boolean.parseBoolean(parts[5]);
+               
+               if(!opcode.equalsIgnoreCase("mappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixAppendMSPInstruction: " + str);
+               
+               return new MatrixAppendMSPInstruction(
+                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+                               in1, in2, offset, out, cbind, opcode, str);
+       }
+       
+       @Override
+       public void processInstruction(ExecutionContext ec)
+               throws DMLRuntimeException 
+       {
+               // map-only append (rhs must be vector and fit in mapper mem)
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               checkBinaryAppendInputCharacteristics(sec, _cbind, false, 
false);
+               MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mc2 = 
sec.getMatrixCharacteristics(input2.getName());
+               int brlen = mc1.getRowsPerBlock();
+               int bclen = mc1.getColsPerBlock();
+               
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+               PartitionedBroadcast<MatrixBlock> in2 = 
sec.getBroadcastForVariable( input2.getName() );
+               long off = sec.getScalarInput( _offset.getName(), 
_offset.getValueType(), _offset.isLiteral()).getLongValue();
+               
+               //execute map-append operations (partitioning preserving if 
#in-blocks = #out-blocks)
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
+               if( preservesPartitioning(mc1, mc2, _cbind) ) {
+                       out = in1.mapPartitionsToPair(
+                                       new MapSideAppendPartitionFunction(in2, 
_cbind, off, brlen, bclen), true);
+               }
+               else {
+                       out = in1.flatMapToPair(
+                                       new MapSideAppendFunction(in2, _cbind, 
off, brlen, bclen));
+               }
+               
+               //put output RDD handle into symbol table
+               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+               sec.addLineageBroadcast(output.getName(), input2.getName());
+       }
+       
+       /**
+        * 
+        * @param mcIn1
+        * @param mcIn2
+        * @return
+        */
+       private boolean preservesPartitioning( MatrixCharacteristics mcIn1, 
MatrixCharacteristics mcIn2, boolean cbind )
+       {
+               long ncblksIn1 = cbind ?
+                               
(long)Math.ceil((double)mcIn1.getCols()/mcIn1.getColsPerBlock()) : 
+                               
(long)Math.ceil((double)mcIn1.getRows()/mcIn1.getRowsPerBlock());
+               long ncblksOut = cbind ? 
+                               
(long)Math.ceil(((double)mcIn1.getCols()+mcIn2.getCols())/mcIn1.getColsPerBlock())
 : 
+                               
(long)Math.ceil(((double)mcIn1.getRows()+mcIn2.getRows())/mcIn1.getRowsPerBlock());
+               
+               //mappend is partitioning-preserving if in-block append (e.g., 
common case of colvector append)
+               return (ncblksIn1 == ncblksOut);
+       }
+       
+       /**
+        * 
+        */
+       private static class MapSideAppendFunction implements  
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, 
MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
2738541014432173450L;
+               
+               private PartitionedBroadcast<MatrixBlock> _pm = null;
+               private boolean _cbind = true;
+               private long _offset; 
+               private int _brlen; 
+               private int _bclen;
+               private long _lastBlockColIndex;
+               
+               public MapSideAppendFunction(PartitionedBroadcast<MatrixBlock> 
binput, boolean cbind, long offset, int brlen, int bclen)  
+               {
+                       _pm = binput;
+                       _cbind = cbind;
+                       
+                       _offset = offset;
+                       _brlen = brlen;
+                       _bclen = bclen;
+                       
+                       //check for boundary block
+                       int blen = cbind ? bclen : brlen;
+                       _lastBlockColIndex = 
(long)Math.ceil((double)_offset/blen);                     
+               }
+               
+               @Override
+               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
+                       
+                       IndexedMatrixValue in1 = 
SparkUtils.toIndexedMatrixBlock(kv);
+                       MatrixIndexes ix = in1.getIndexes();
+                       
+                       //case 1: pass through of non-boundary blocks
+                       if( 
(_cbind?ix.getColumnIndex():ix.getRowIndex())!=_lastBlockColIndex ) 
+                       {
+                               ret.add( kv );
+                       }
+                       //case 2: pass through full input block and rhs block 
+                       else if( _cbind && in1.getValue().getNumColumns() == 
_bclen 
+                                       || !_cbind && 
in1.getValue().getNumRows() == _brlen) 
+                       {                               
+                               //output lhs block
+                               ret.add( kv );
+                               
+                               //output shallow copy of rhs block
+                               if( _cbind ) {
+                                       ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
+                                                       new 
MatrixIndexes(ix.getRowIndex(), ix.getColumnIndex()+1),
+                                                       
_pm.getBlock((int)ix.getRowIndex(), 1)) );
+                               }
+                               else { //rbind
+                                       ret.add( new Tuple2<MatrixIndexes, 
MatrixBlock>(
+                                                       new 
MatrixIndexes(ix.getRowIndex()+1, ix.getColumnIndex()),
+                                                       _pm.getBlock(1, 
(int)ix.getColumnIndex())) );   
+                               }
+                       }
+                       //case 3: append operation on boundary block
+                       else 
+                       {
+                               //allocate space for the output value
+                               ArrayList<IndexedMatrixValue> outlist=new 
ArrayList<IndexedMatrixValue>(2);
+                               IndexedMatrixValue first = new 
IndexedMatrixValue(new MatrixIndexes(ix), new MatrixBlock());
+                               outlist.add(first);
+                               
+                               MatrixBlock value_in2 = null;
+                               if( _cbind ) {
+                                       value_in2 = 
_pm.getBlock((int)ix.getRowIndex(), 1);
+                                       
if(in1.getValue().getNumColumns()+value_in2.getNumColumns()>_bclen) {
+                                               IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+                                               
second.getIndexes().setIndexes(ix.getRowIndex(), ix.getColumnIndex()+1);
+                                               outlist.add(second);
+                                       }
+                               }
+                               else { //rbind
+                                       value_in2 = _pm.getBlock(1, 
(int)ix.getColumnIndex());
+                                       
if(in1.getValue().getNumRows()+value_in2.getNumRows()>_brlen) {
+                                               IndexedMatrixValue second=new 
IndexedMatrixValue(new MatrixIndexes(), new MatrixBlock());
+                                               
second.getIndexes().setIndexes(ix.getRowIndex()+1, ix.getColumnIndex());
+                                               outlist.add(second);
+                                       }
+                               }
+       
+                               
OperationsOnMatrixValues.performAppend(in1.getValue(), value_in2, outlist, 
_brlen, _bclen, _cbind, true, 0);    
+                               
ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
+                       }
+                       
+                       return ret;
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class MapSideAppendPartitionFunction implements  
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixBlock>>, MatrixIndexes, 
MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
5767240739761027220L;
+
+               private PartitionedBroadcast<MatrixBlock> _pm = null;
+               private boolean _cbind = true;
+               private long _lastBlockColIndex = -1;
+               
+               public 
MapSideAppendPartitionFunction(PartitionedBroadcast<MatrixBlock> binput, 
boolean cbind, long offset, int brlen, int bclen)  
+               {
+                       _pm = binput;
+                       _cbind = cbind;
+                       
+                       //check for boundary block
+                       int blen = cbind ? bclen : brlen;
+                       _lastBlockColIndex = 
(long)Math.ceil((double)offset/blen);                      
+               }
+
+               @Override
+               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> arg0)
+                       throws Exception 
+               {
+                       return new MapAppendPartitionIterator(arg0);
+               }
+               
+               /**
+                * Lazy mappend iterator to prevent materialization of entire 
partition output in-memory.
+                * The implementation via mapPartitions is required to preserve 
partitioning information,
+                * which is important for performance. 
+                */
+               private class MapAppendPartitionIterator extends 
LazyIterableIterator<Tuple2<MatrixIndexes, MatrixBlock>>
+               {
+                       public 
MapAppendPartitionIterator(Iterator<Tuple2<MatrixIndexes, MatrixBlock>> in) {
+                               super(in);
+                       }
+
+                       @Override
+                       protected Tuple2<MatrixIndexes, MatrixBlock> 
computeNext(Tuple2<MatrixIndexes, MatrixBlock> arg)
+                               throws Exception
+                       {
+                               MatrixIndexes ix = arg._1();
+                               MatrixBlock in1 = arg._2();
+                               
+                               //case 1: pass through of non-boundary blocks
+                               if( 
(_cbind?ix.getColumnIndex():ix.getRowIndex()) != _lastBlockColIndex ) {
+                                       return arg;
+                               }
+                               //case 3: append operation on boundary block
+                               else {
+                                       int rowix = _cbind ? 
(int)ix.getRowIndex() : 1;
+                                       int colix = _cbind ? 1 : 
(int)ix.getColumnIndex();                                      
+                                       MatrixBlock in2 = _pm.getBlock(rowix, 
colix);
+                                       MatrixBlock out = 
in1.appendOperations(in2, new MatrixBlock(), _cbind);
+                                       return new 
Tuple2<MatrixIndexes,MatrixBlock>(ix, out);
+                               }       
+                       }                       
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
new file mode 100644
index 0000000..644fcd2
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendRSPInstruction.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
+
+public class MatrixAppendRSPInstruction extends AppendRSPInstruction
+{
+       public MatrixAppendRSPInstruction(Operator op, CPOperand in1, CPOperand 
in2, CPOperand out, boolean cbind, String opcode, String istr)
+       {
+               super(op, in1, in2, out, cbind, opcode, istr);
+       }
+       
+       public static MatrixAppendRSPInstruction parseInstruction ( String str 
) 
+               throws DMLRuntimeException 
+       {       
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               InstructionUtils.checkNumFields (parts, 4);
+               
+               String opcode = parts[0];
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand out = new CPOperand(parts[3]);
+               boolean cbind = Boolean.parseBoolean(parts[4]);
+               
+               if(!opcode.equalsIgnoreCase("rappend"))
+                       throw new DMLRuntimeException("Unknown opcode while 
parsing a MatrixAppendRSPInstruction: " + str);
+               
+               return new MatrixAppendRSPInstruction(
+                               new 
ReorgOperator(OffsetColumnIndex.getOffsetColumnIndexFnObject(-1)), 
+                               in1, in2, out, cbind, opcode, str);
+       }
+       
+       @Override
+       public void processInstruction(ExecutionContext ec)
+               throws DMLRuntimeException 
+       {
+               // reduce-only append (output must have at most one column 
block)
+               SparkExecutionContext sec = (SparkExecutionContext)ec;
+               checkBinaryAppendInputCharacteristics(sec, _cbind, true, false);
+               
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
+               
+               //execute reduce-append operations (partitioning preserving)
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
+                               .join(in2)
+                               .mapValues(new 
ReduceSideAppendFunction(_cbind));
+
+               //put output RDD handle into symbol table
+               updateBinaryAppendOutputMatrixCharacteristics(sec, _cbind);
+               sec.setRDDHandleForVariable(output.getName(), out);
+               sec.addLineageRDD(output.getName(), input1.getName());
+               sec.addLineageRDD(output.getName(), input2.getName());          
+       }
+       
+       /**
+        * 
+        */
+       private static class ReduceSideAppendFunction implements 
Function<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
-6763904972560309095L;
+
+               private boolean _cbind = true;
+                               
+               public ReduceSideAppendFunction(boolean cbind) {
+                       _cbind = cbind;
+               }
+               
+               @Override
+               public MatrixBlock call(Tuple2<MatrixBlock, MatrixBlock> arg0)
+                       throws Exception 
+               {
+                       MatrixBlock left = arg0._1();
+                       MatrixBlock right = arg0._2();
+                       
+                       return left.appendOperations(right, new MatrixBlock(), 
_cbind);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 64bc6fe..a4f826c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -777,6 +777,8 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        ret._schema.addAll(that._schema);
                        ret._colnames = new ArrayList<String>(_colnames);
                        ret._colnames.addAll(that._colnames);
+                       ret._colmeta = new ArrayList<ColumnMetadata>(_colmeta);
+                       ret._colmeta.addAll(that._colmeta);
                        
                        //concatenate column data (w/ deep copy to prevent side 
effects)
                        for( Array tmp : _coldata )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 6bce4ff..fa17fcd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,7 +23,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData;
@@ -596,4 +599,20 @@ public class UtilFunctions
                
                return Arrays.asList(schema);
        }
+       
+       
+       /*
+        * This function will return datatype, if its Matrix or Frame
+        * 
+        *  @param      str
+        *              Instruction string to execute
+        */
+       
+       public static DataType getDataType(String str, int index)
+       {
+               String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
+               CPOperand in1 = new CPOperand(parts[index]);
+       
+               return in1.getDataType();
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
new file mode 100644
index 0000000..20c4a27
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.BinaryOp;
+import org.apache.sysml.hops.BinaryOp.AppendMethod;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.UtilFunctions;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class FrameAppendDistTest extends AutomatedTestBase
+{
+       private final static String TEST_NAME = "FrameAppend";
+       private final static String TEST_DIR = "functions/frame/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameAppendDistTest.class.getSimpleName() + "/";
+
+       private final static double epsilon=0.0000000001;
+       private final static int min=1;
+       private final static int max=100;
+       
+       private final static int rows1 = 1692;
+       private final static int rows2 = 1192;
+       //usecase a: inblock single
+       private final static int cols1a = 375;
+       private final static int cols2a = 92;
+       //usecase b: inblock multiple
+       private final static int cols1b = 1059;
+       //usecase c: outblock blocksize 
+       private final static int cols1d = 1460;
+       private final static int cols3d = 990;
+       
+               
+       private final static double sparsity1 = 0.5;
+       private final static double sparsity2 = 0.01;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, 
+                               new String[] {"C"}));
+       }
+
+       @Test
+       public void testAppendInBlock1DenseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, 
cols2a, false, AppendMethod.MR_RAPPEND, false);
+       }   
+       
+       @Test
+       public void testAppendInBlock1SparseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, 
cols2a, true, AppendMethod.MR_RAPPEND, false);
+       }   
+       
+       @Test
+       public void testAppendInBlock1DenseRBindSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows2, cols1a, 
cols1a, false, AppendMethod.MR_RAPPEND, true);
+       }   
+       
+       @Test
+       public void testAppendInBlock1SparseRBindSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1a, 
cols1a, true, AppendMethod.MR_RAPPEND, true);
+       }   
+       
+       //NOTE: mappend only applied for m2_cols<=blocksize
+       @Test
+       public void testMapAppendInBlock2DenseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, 
cols2a, false, AppendMethod.MR_MAPPEND, false);
+       }
+       
+       @Test
+       public void testMapAppendInBlock2SparseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1b, 
cols2a, true, AppendMethod.MR_MAPPEND, false);
+       }
+       
+       @Test
+       public void testMapAppendOutBlock2DenseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, 
cols3d, false, AppendMethod.MR_MAPPEND, false);
+       }
+       
+       @Test
+       public void testMapAppendOutBlock2SparseSP() {
+               commonAppendTest(RUNTIME_PLATFORM.SPARK, rows1, rows1, cols1d, 
cols3d, true, AppendMethod.MR_MAPPEND, false);
+       }
+       
+       /**
+        * 
+        * @param platform
+        * @param rows
+        * @param cols1
+        * @param cols2
+        * @param sparse
+        */
+       public void commonAppendTest(RUNTIME_PLATFORM platform, int rows1, int 
rows2, int cols1, int cols2, boolean sparse, AppendMethod forcedAppendMethod, 
boolean rbind)
+       {
+               TestConfiguration config = 
getAndLoadTestConfiguration(TEST_NAME);
+           
+               RUNTIME_PLATFORM prevPlfm=rtplatform;
+               
+               double sparsity = (sparse) ? sparsity2 : sparsity1; 
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               
+               try
+               {
+                       if(forcedAppendMethod != null) {
+                               BinaryOp.FORCED_APPEND_METHOD = 
forcedAppendMethod;
+                       }
+                       rtplatform = platform;
+                       if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                               DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+       
+                       config.addVariable("rows", rows1);
+                       config.addVariable("cols", cols1);
+                 
+                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
+                       String RI_HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = RI_HOME + TEST_NAME + ".dml";
+                       programArgs = new String[]{"-explain","-args",  
input("A"), 
+                                                                    
Long.toString(rows1), 
+                                                                    
Long.toString(cols1),
+                                                                               
 input("B"),
+                                                                    
Long.toString(rows2), 
+                                                                               
 Long.toString(cols2),
+                                                        output("C"),
+                                                        (rbind? "rbind": 
"cbind")};
+                       fullRScriptName = RI_HOME + TEST_NAME + ".R";
+                       rCmd = "Rscript" + " " + fullRScriptName + " " + 
+                              inputDir() + " " + expectedDir() + " " + (rbind? 
"rbind": "cbind");
+       
+                       //initialize the frame data.
+                       List<ValueType> lschemaA = 
Arrays.asList(genMixSchema(cols1));
+                       double[][] A = getRandomMatrix(rows1, cols1, min, max, 
sparsity, 1111 /*\\System.currentTimeMillis()*/);
+                       writeInputFrameWithMTD("A", A, true, lschemaA, 
OutputInfo.BinaryBlockOutputInfo);               
+               
+                       List<ValueType> lschemaB = 
Arrays.asList(genMixSchema(cols2));
+                       double[][] B = getRandomMatrix(rows2, cols2, min, max, 
sparsity, 2345 /*\\System.currentTimeMillis()*/);
+                       writeInputFrameWithMTD("B", B, true, lschemaB, 
OutputInfo.BinaryBlockOutputInfo);               
+                               
+                       boolean exceptionExpected = false;
+                       int expectedNumberOfJobs = -1;
+                       runTest(true, exceptionExpected, null, 
expectedNumberOfJobs);
+                       runRScript(true);
+
+                       List<ValueType> lschemaAB = new 
ArrayList<ValueType>(lschemaA);
+                       lschemaAB.addAll(lschemaB);
+                       
+                       for(String file: config.getOutputFiles())
+                       {
+                               FrameBlock frameBlock = 
readDMLFrameFromHDFS(file, InputInfo.BinaryBlockInputInfo);
+                               MatrixCharacteristics md = new 
MatrixCharacteristics(frameBlock.getNumRows(), frameBlock.getNumColumns(), -1, 
-1);
+                               FrameBlock frameRBlock = 
readRFrameFromHDFS(file+".csv", InputInfo.CSVInputInfo, md);
+                               verifyFrameData(frameBlock, frameRBlock, 
(ValueType[]) lschemaAB.toArray(new ValueType[0]));
+                               System.out.println("File processed is " + file);
+                       }
+               }
+               catch(Exception ex) {
+                       ex.printStackTrace();
+                       throw new RuntimeException(ex);
+               }
+               finally
+               {
+                       //reset execution platform
+                       rtplatform = prevPlfm;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       BinaryOp.FORCED_APPEND_METHOD = null;
+               }
+       }
+       
+       ValueType[] genMixSchema(int cols)
+       {
+               List<ValueType> schemaMixedLargeListStr = 
Collections.nCopies(cols/4, ValueType.STRING);
+               List<ValueType> schemaMixedLargeListDble  = 
Collections.nCopies(cols/4, ValueType.DOUBLE);
+               List<ValueType> schemaMixedLargeListInt  = 
Collections.nCopies(cols/4, ValueType.INT);
+               List<ValueType> schemaMixedLargeListBool  = 
Collections.nCopies(cols-(cols/4)*3, ValueType.BOOLEAN);
+
+               final List<ValueType> schemaMixedLargeList = new 
ArrayList<ValueType>(schemaMixedLargeListStr);
+               schemaMixedLargeList.addAll(schemaMixedLargeListDble);
+               schemaMixedLargeList.addAll(schemaMixedLargeListInt);
+               schemaMixedLargeList.addAll(schemaMixedLargeListBool);
+               ValueType[] schemaMixedLarge = new 
ValueType[schemaMixedLargeList.size()];
+               schemaMixedLarge = (ValueType[]) 
schemaMixedLargeList.toArray(schemaMixedLarge);
+               
+               return schemaMixedLarge;
+       }
+       
+       private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, 
ValueType[] schema) {
+               for ( int i=0; i<frame1.getNumRows(); ++i )
+                       for( int j=0; j<frame1.getNumColumns(); j++ )   {
+                               Object val1 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame1.get(i, j)));
+                               Object val2 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame2.get(i, j)));
+                               if( TestUtils.compareToR(schema[j], val1, val2, 
epsilon) != 0)
+                                       Assert.fail("The DML data for cell ("+ 
i + "," + j + ") is " + val1 + 
+                                                       ", not same as the R 
value " + val2);
+                       }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.R 
b/src/test/scripts/functions/frame/FrameAppend.R
new file mode 100644
index 0000000..f97916d
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+options(digits=22)
+library("Matrix")
+
+A=read.csv(paste(args[1], "A.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+B=read.csv(paste(args[1], "B.csv", sep=""), header = FALSE, 
stringsAsFactors=FALSE)
+if(args[3] == "rbind") {
+       C=rbind(A, B)
+} else {
+       C=cbind2(A, B)
+}
+write.csv(C, paste(args[2], "C.csv", sep=""), row.names = FALSE, quote = FALSE)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c22f239e/src/test/scripts/functions/frame/FrameAppend.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameAppend.dml 
b/src/test/scripts/functions/frame/FrameAppend.dml
new file mode 100644
index 0000000..eea118e
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameAppend.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A=read($1, data_type="frame", rows=$2, cols=$3, format="binary")
+B=read($4, data_type="frame", rows=$5, cols=$6, format="binary")
+if ($8 == "rbind") {
+       C=rbind(A, B)
+} else {
+       C=cbind(A, B)
+}
+write(C, $7, format="binary")

Reply via email to