Repository: incubator-systemml
Updated Branches:
  refs/heads/master 919d919be -> 9451a0fd8


[SYSTEMML-562] Spark frame reblock instruction (text), tests

This patch generalizes the existing reblock instruction for frames. In
order to stay compatible with the file-based transform, this does not
include CSV yet. Furthermore, it also includes a fix of the distributed
frame to matrix converter to deal with unaligned blocks.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9451a0fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9451a0fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9451a0fd

Branch: refs/heads/master
Commit: 9451a0fd89bcf02cc9ba572e8092b9f1447c4d86
Parents: 919d919
Author: Matthias Boehm <[email protected]>
Authored: Fri Jun 10 00:59:26 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jun 10 12:45:17 2016 -0700

----------------------------------------------------------------------
 .../apache/sysml/hops/recompile/Recompiler.java |  36 ++-
 .../RewriteAlgebraicSimplificationStatic.java   |   2 +-
 .../rewrite/RewriteBlockSizeAndReblock.java     |   7 +-
 .../controlprogram/caching/FrameObject.java     |   4 +
 .../context/SparkExecutionContext.java          |  26 +-
 .../spark/CSVReblockSPInstruction.java          |   2 +-
 .../instructions/spark/CastSPInstruction.java   |   5 +-
 .../spark/ReblockSPInstruction.java             |  97 +++++--
 .../spark/utils/FrameRDDConverterUtils.java     |  90 +++---
 .../functions/frame/FrameConverterTest.java     |   8 +-
 .../functions/frame/FrameMatrixReblockTest.java | 277 +++++++++++++++++++
 .../misc/RewritePushdownSumBinaryMult.java      |  38 +--
 .../functions/frame/FrameMatrixReblock.dml      |  25 ++
 13 files changed, 480 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index 9daefdf..ec89775 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -78,6 +78,8 @@ import 
org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
 import org.apache.sysml.runtime.controlprogram.ProgramBlock;
 import org.apache.sysml.runtime.controlprogram.WhileProgramBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
@@ -94,6 +96,7 @@ import 
org.apache.sysml.runtime.instructions.mr.RandInstruction;
 import org.apache.sysml.runtime.instructions.mr.SeqInstruction;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.util.MapReduceTool;
@@ -1907,8 +1910,8 @@ public class Recompiler
        public static boolean checkCPReblock(ExecutionContext ec, String varin) 
                throws DMLRuntimeException
        {
-               MatrixObject in = ec.getMatrixObject(varin);
-               MatrixCharacteristics mc = in.getMatrixCharacteristics();
+               CacheableData<?> obj = ec.getCacheableData(varin);
+               MatrixCharacteristics mc = ec.getMatrixCharacteristics(varin);
                
                long rows = mc.getRows();
                long cols = mc.getCols();
@@ -1924,8 +1927,8 @@ public class Recompiler
                //robustness for usage through mlcontext (key/values of input 
rdds are 
                //not serializable for text; also bufferpool rdd read only 
supported for 
                // binarycell and binaryblock)
-               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
in.getMetaData();
-               if( in.getRDDHandle() != null 
+               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
obj.getMetaData();
+               if( obj.getRDDHandle() != null 
                        && iimd.getInputInfo() != 
InputInfo.BinaryBlockInputInfo 
                        && iimd.getInputInfo() != InputInfo.BinaryCellInputInfo 
) {
                        return false;
@@ -2061,7 +2064,7 @@ public class Recompiler
         * @param out
         * @throws DMLRuntimeException 
         */
-       public static void executeInMemoryReblock(ExecutionContext ec, String 
varin, String varout) 
+       public static void executeInMemoryMatrixReblock(ExecutionContext ec, 
String varin, String varout) 
                throws DMLRuntimeException
        {
                MatrixObject in = ec.getMatrixObject(varin);
@@ -2079,6 +2082,29 @@ public class Recompiler
        
        /**
         * 
+        * @param ec
+        * @param varin
+        * @param varout
+        * @throws DMLRuntimeException
+        */
+       public static void executeInMemoryFrameReblock(ExecutionContext ec, 
String varin, String varout) 
+               throws DMLRuntimeException
+       {
+               FrameObject in = ec.getFrameObject(varin);
+               FrameObject out = ec.getFrameObject(varout);
+
+               //read text input frame (through buffer pool, frame object 
carries all relevant
+               //information including additional arguments for csv reblock)
+               FrameBlock fb = in.acquireRead(); 
+               
+               //set output (incl update matrix characteristics)
+               out.acquireModify( fb );
+               out.release();
+               in.release();                           
+       }
+       
+       /**
+        * 
         * @param fname
         * @return
         * @throws DMLRuntimeException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
index c36c01f..e903a03 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
@@ -145,7 +145,7 @@ public class RewriteAlgebraicSimplificationStatic extends 
HopRewriteRule
                        hi = simplifyBushyBinaryOperation(hop, hi, i);       
//e.g., (X*(Y*(Z%*%v))) -> (X*Y)*(Z%*%v)
                        hi = simplifyUnaryAggReorgOperation(hop, hi, i);     
//e.g., sum(t(X)) -> sum(X)
                        hi = pushdownUnaryAggTransposeOperation(hop, hi, i); 
//e.g., colSums(t(X)) -> t(rowSums(X))
-                       hi = pushdownSumBinaryMult(hop, hi, i);                 
         //e.g., sum(lamda*X) -> lamda*sum(X)
+                       hi = pushdownSumBinaryMult(hop, hi, i);              
//e.g., sum(lamda*X) -> lamda*sum(X)
                        hi = simplifyUnaryPPredOperation(hop, hi, i);        
//e.g., abs(ppred()) -> ppred(), others: round, ceil, floor
                        hi = simplifyTransposedAppend(hop, hi, i);           
//e.g., t(cbind(t(A),t(B))) -> rbind(A,B);
                        hi = fuseBinarySubDAGToUnaryOperation(hop, hi, i);   
//e.g., X*(1-X)-> sprop(X) || 1/(1+exp(-X)) -> sigmoid(X) || X*(X>0) -> selp(X)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
index ce09a6f..92dcd69 100644
--- 
a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
+++ 
b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java
@@ -27,6 +27,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.DataOp;
 import org.apache.sysml.hops.FunctionOp;
 import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.hops.Hop.DataOpTypes;
 import org.apache.sysml.hops.Hop.FileFormatTypes;
 import org.apache.sysml.hops.Hop.ParamBuiltinOp;
@@ -92,8 +93,9 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule
                if (hop instanceof DataOp) 
                {
                        // if block size does not match
-                       if(    canReblock && hop.getDataType() == 
DataType.MATRIX
-                               && (hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || 
hop.getColsInBlock() != GLOBAL_BLOCKSIZE) ) 
+                       if( canReblock //TODO change frame condition to != 
BINARY once transform over frames supported
+                               && ((hop.getDataType() == DataType.MATRIX && 
(hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || hop.getColsInBlock() != 
GLOBAL_BLOCKSIZE)
+                                 ||(hop.getDataType() == DataType.FRAME && 
OptimizerUtils.isSparkExecutionMode() && 
((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT)))) 
                        {
                                if (((DataOp) hop).getDataOpType() == 
DataOp.DataOpTypes.PERSISTENTREAD) 
                                {
@@ -141,6 +143,7 @@ public class RewriteBlockSizeAndReblock extends 
HopRewriteRule
                                }
                        }
                } 
+               //TODO remove once transform rebased to frames
                else if ( (hop instanceof ParameterizedBuiltinOp && 
((ParameterizedBuiltinOp)hop).getOp() == ParamBuiltinOp.TRANSFORM) ) {
                        
                        // check if there exists a non-csv-write output. If 
yes, add reblock

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index 84a74dd..b850301 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -86,6 +86,10 @@ public class FrameObject extends CacheableData<FrameBlock>
        public FrameObject(FrameObject fo) {
                super(fo);
        }
+       
+       public List<ValueType> getSchema() {
+               return _schema;
+       }
 
        public void setSchema(String schema) {
                if( schema.equals("*") ) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 020c49f..1516286 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -47,6 +47,7 @@ import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.Program;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -532,11 +533,8 @@ public class SparkExecutionContext extends ExecutionContext
        }
        
        /**
-        * Keep the output rdd of spark rdd operations as meta data of matrix 
objects in the 
-        * symbol table.
-        * 
-        * Spark instructions should call this for all matrix outputs.
-        * 
+        * Keep the output rdd of spark rdd operations as meta data of 
matrix/frame 
+        * objects in the symbol table.
         * 
         * @param varname
         * @param rdd
@@ -545,23 +543,9 @@ public class SparkExecutionContext extends ExecutionContext
        public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> 
rdd) 
                throws DMLRuntimeException
        {
-               MatrixObject mo = getMatrixObject(varname);
-               RDDObject rddhandle = new RDDObject(rdd, varname);
-               mo.setRDDHandle( rddhandle );
-       }
-       
-       /**
-        * 
-        * @param varname
-        * @param rdd
-        * @throws DMLRuntimeException
-        */
-       public void setFrameRDDHandleForVariable(String varname, 
JavaPairRDD<?,?> rdd) 
-               throws DMLRuntimeException
-       {
-               FrameObject mo = getFrameObject(varname);
+               CacheableData<?> obj = getCacheableData(varname);
                RDDObject rddhandle = new RDDObject(rdd, varname);
-               mo.setRDDHandle( rddhandle );
+               obj.setRDDHandle( rddhandle );
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
index 6aa7597..98cc5a0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java
@@ -107,7 +107,7 @@ public class CSVReblockSPInstruction extends 
UnarySPInstruction
 
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
+                       Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
                        return;
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
index 8160bd0..3562d18 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java
@@ -79,20 +79,17 @@ public class CastSPInstruction extends UnarySPInstruction
                        in = ((JavaPairRDD<Long, FrameBlock>)in).mapToPair(new 
LongFrameToLongWritableFrameFunction());
                        out = FrameRDDConverterUtils.binaryBlockToMatrixBlock(
                                (JavaPairRDD<LongWritable, FrameBlock>)in, 
mcIn, mcOut);
-                       
-                       sec.setRDDHandleForVariable(output.getName(), out);
                }
                else if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) {
                        out = 
FrameRDDConverterUtils.matrixBlockToBinaryBlockLongIndex(sec.getSparkContext(), 
                                (JavaPairRDD<MatrixIndexes, MatrixBlock>)in, 
mcIn);
-               
-                       sec.setFrameRDDHandleForVariable(output.getName(), out);
                }
                else {
                        throw new DMLRuntimeException("Unsupported spark cast 
operation: "+opcode);
                }
                
                //update output statistics and add lineage
+               sec.setRDDHandleForVariable(output.getName(), out);
                updateUnaryOutputMatrixCharacteristics(sec, input1.getName(), 
output.getName());
                sec.addLineageRDD(output.getName(), input1.getName());
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
index 000bd63..d0128f9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
@@ -23,18 +23,23 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.sysml.hops.recompile.Recompiler;
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBinaryReblock;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
@@ -77,38 +82,61 @@ public class ReblockSPInstruction extends UnarySPInstruction
        
 
        @Override
-       @SuppressWarnings("unchecked")
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 
        {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
 
                //set the output characteristics
-               MatrixObject mo = sec.getMatrixObject(input1.getName());
+               CacheableData<?> obj = sec.getCacheableData(input1.getName());
                MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
                MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                mcOut.set(mc.getRows(), mc.getCols(), brlen, bclen, 
mc.getNonZeros());
-
+               
                //get the source format form the meta data
-               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
mo.getMetaData();
-               if(iimd == null) {
+               MatrixFormatMetaData iimd = (MatrixFormatMetaData) 
obj.getMetaData();
+               if(iimd == null)
                        throw new DMLRuntimeException("Error: Metadata not 
found");
-               }
-               
+               InputInfo iinfo = iimd.getInputInfo();
+
                //check for in-memory reblock (w/ lazy spark context, potential 
for latency reduction)
                if( Recompiler.checkCPReblock(sec, input1.getName()) ) {
-                       Recompiler.executeInMemoryReblock(sec, 
input1.getName(), output.getName());
+                       if( input1.getDataType() == DataType.MATRIX )
+                               Recompiler.executeInMemoryMatrixReblock(sec, 
input1.getName(), output.getName());
+                       else if( input1.getDataType() == DataType.FRAME )
+                               Recompiler.executeInMemoryFrameReblock(sec, 
input1.getName(), output.getName());        
                        return;
                }
                
-               if(iimd.getInputInfo() == InputInfo.TextCellInputInfo || 
iimd.getInputInfo() == InputInfo.MatrixMarketInputInfo ) 
+               //execute matrix/frame reblock
+               if( input1.getDataType() == DataType.MATRIX )
+                       processMatrixReblockInstruction(sec, iinfo);
+               else if( input1.getDataType() == DataType.FRAME )
+                       processFrameReblockInstruction(sec, iinfo);
+       }
+       
+       /**
+        * 
+        * @param sec
+        * @param iinfo
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unchecked")
+       protected void processMatrixReblockInstruction(SparkExecutionContext 
sec, InputInfo iinfo) 
+               throws DMLRuntimeException
+       {
+               MatrixObject mo = sec.getMatrixObject(input1.getName());
+               MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
+               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+               
+               if(iinfo == InputInfo.TextCellInputInfo || iinfo == 
InputInfo.MatrixMarketInputInfo ) 
                {
                        //check jdk version (prevent double.parseDouble 
contention on <jdk8)
                        sec.checkAndRaiseValidationWarningJDKVersion();
                        
                        //get the input textcell rdd
                        JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>) 
-                                       
sec.getRDDHandleForVariable(input1.getName(), iimd.getInputInfo());
+                                       
sec.getRDDHandleForVariable(input1.getName(), iinfo);
                        
                        //convert textcell to binary block
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
@@ -118,7 +146,7 @@ public class ReblockSPInstruction extends UnarySPInstruction
                        sec.setRDDHandleForVariable(output.getName(), out);
                        sec.addLineageRDD(output.getName(), input1.getName());
                }
-               else if(iimd.getInputInfo() == InputInfo.CSVInputInfo) {
+               else if(iinfo == InputInfo.CSVInputInfo) {
                        // HACK ALERT: Until we introduces the rewrite to 
insert csvrblock for non-persistent read
                        // throw new DMLRuntimeException("CSVInputInfo is not 
supported for ReblockSPInstruction");
                        CSVReblockSPInstruction csvInstruction = null;
@@ -140,16 +168,16 @@ public class ReblockSPInstruction extends 
UnarySPInstruction
                        csvInstruction.processInstruction(sec);
                        return;
                }
-               else if(iimd.getInputInfo()==InputInfo.BinaryCellInputInfo) 
+               else if(iinfo == InputInfo.BinaryCellInputInfo) 
                {
-                       JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = 
(JavaPairRDD<MatrixIndexes, MatrixCell>) 
sec.getRDDHandleForVariable(input1.getName(), iimd.getInputInfo());
+                       JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = 
(JavaPairRDD<MatrixIndexes, MatrixCell>) 
sec.getRDDHandleForVariable(input1.getName(), iinfo);
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), binaryCells, 
mcOut, outputEmptyBlocks);
                        
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);
                        sec.addLineageRDD(output.getName(), input1.getName());
                }
-               else if(iimd.getInputInfo()==InputInfo.BinaryBlockInputInfo) 
+               else if(iinfo == InputInfo.BinaryBlockInputInfo) 
                {
                        /// HACK ALERT: Workaround for MLContext 
                        if(mc.getRowsPerBlock() == mcOut.getRowsPerBlock() && 
mc.getColsPerBlock() == mcOut.getColsPerBlock()) {
@@ -162,7 +190,7 @@ public class ReblockSPInstruction extends UnarySPInstruction
                                        return;
                                }
                                else {
-                                       throw new DMLRuntimeException("Input 
RDD is not accessible through buffer pool for ReblockSPInstruction:" + 
iimd.getInputInfo());
+                                       throw new DMLRuntimeException("Input 
RDD is not accessible through buffer pool for ReblockSPInstruction:" + iinfo);
                                }
                        }
                        else 
@@ -180,7 +208,42 @@ public class ReblockSPInstruction extends 
UnarySPInstruction
                        }
                }
                else {
-                       throw new DMLRuntimeException("The given InputInfo is 
not implemented for ReblockSPInstruction:" + iimd.getInputInfo());
-               }               
+                       throw new DMLRuntimeException("The given InputInfo is 
not implemented for ReblockSPInstruction:" + iinfo);
+               }
+       }
+       
+       /**
+        * 
+        * @param sec
+        * @param iinfo
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings("unchecked")
+       protected void processFrameReblockInstruction(SparkExecutionContext 
sec, InputInfo iinfo) 
+               throws DMLRuntimeException
+       {
+               FrameObject fo = sec.getFrameObject(input1.getName());
+               MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
+               
+               if(iinfo == InputInfo.TextCellInputInfo ) 
+               {
+                       //check jdk version (prevent double.parseDouble 
contention on <jdk8)
+                       sec.checkAndRaiseValidationWarningJDKVersion();
+                       
+                       //get the input textcell rdd
+                       JavaPairRDD<LongWritable, Text> lines = 
(JavaPairRDD<LongWritable, Text>) 
+                                       
sec.getRDDHandleForVariable(input1.getName(), iinfo);
+                       
+                       //convert textcell to binary block
+                       JavaPairRDD<Long, FrameBlock> out = 
+                                       
FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, 
mcOut, fo.getSchema());
+                       
+                       //put output RDD handle into symbol table
+                       sec.setRDDHandleForVariable(output.getName(), out);
+                       sec.addLineageRDD(output.getName(), input1.getName());
+               }
+               else {
+                       throw new DMLRuntimeException("The given InputInfo is 
not implemented for ReblockSPInstruction:" + iinfo);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 003b016..7c8c08b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.instructions.spark.utils;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -161,23 +162,20 @@ public class FrameRDDConverterUtils
         * @return
         * @throws DMLRuntimeException
         */
-       public static JavaPairRDD<LongWritable, FrameBlock> 
textCellToBinaryBlock(JavaSparkContext sc,
+       public static JavaPairRDD<Long, FrameBlock> 
textCellToBinaryBlock(JavaSparkContext sc,
                        JavaPairRDD<LongWritable, Text> in, 
MatrixCharacteristics mcOut, List<ValueType> schema ) 
                throws DMLRuntimeException  
        {
+               //replicate schema entry if necessary
+               List<ValueType> lschema = (schema.size()==1 && 
mcOut.getCols()>1) ?
+                               Collections.nCopies((int)mcOut.getCols(), 
schema.get(0)) : schema;
                
                //convert input rdd to serializable long/frame block
                JavaPairRDD<Long,Text> input = 
                                in.mapToPair(new 
LongWritableTextToLongTextFunction());
                
-               //Do actual conversion
-               JavaPairRDD<Long,FrameBlock> output = 
textCellToBinaryBlockLongIndex(sc, input, mcOut, schema);
-               
-               //convert input rdd to serializable long/frame block
-               JavaPairRDD<LongWritable,FrameBlock> out = 
-                               output.mapToPair(new 
LongFrameToLongWritableFrameFunction());
-               
-               return out;
+               //do actual conversion
+               return textCellToBinaryBlockLongIndex(sc, input, mcOut, 
lschema);
        }
 
                
@@ -736,13 +734,12 @@ public class FrameRDDConverterUtils
        {
                private static final long serialVersionUID = 
-2654986510471835933L;
                
-               MatrixCharacteristics _mcIn, _mcOut;
+               private MatrixCharacteristics _mcIn;
+               private MatrixCharacteristics _mcOut;
 
-               public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics 
mcIn,
-                               MatrixCharacteristics mcOut) {
-                               
-                               _mcIn = mcIn;           //Frame Characteristics
-                               _mcOut = mcOut;         //Matrix Characteristics
+               public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics 
mcIn, MatrixCharacteristics mcOut) {                      
+                       _mcIn = mcIn;           //Frame Characteristics
+                       _mcOut = mcOut;         //Matrix Characteristics
                }
 
                @Override
@@ -753,46 +750,35 @@ public class FrameRDDConverterUtils
                        FrameBlock blk = arg0._2();
                        
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
+                       long rlen = _mcIn.getRows();
+                       long clen = _mcIn.getCols();
+                       int brlen = _mcOut.getRowsPerBlock();
+                       int bclen = _mcOut.getColsPerBlock();
                        
-                       int _brlenMatrix = _mcOut.getRowsPerBlock();
-                       int _bclenMatrix = _mcOut.getColsPerBlock();
-                       long _rlen = _mcIn.getRows();
-                       long _clen = _mcIn.getCols();
-                       
-                       long lRowId = 0;
-                       while (lRowId < blk.getNumRows()) {
-                               // Global Row indices (indexes) across all 
frame blocks  
-                               long endRow = 
((rowIndex+lRowId-1)/_brlenMatrix+1) * _brlenMatrix;
-                               long begRow = Math.max(endRow-_brlenMatrix+1, 
0);
-                               endRow = Math.min(endRow, _rlen);
-                               
-                               // Local Row indices (indexes) within a matrix 
block  
-                               long begRowMat = 
UtilFunctions.computeCellInBlock(begRow, _brlenMatrix);
-                               long endRowMat = 
UtilFunctions.computeCellInBlock(endRow, _brlenMatrix);
-                               
-                               long lColId = 0;
-                               while (lColId < blk.getNumColumns()) {
-                                       // Global Column index across all frame 
blocks  
-                                       long endCol = 
Math.min(lColId+_bclenMatrix-1, _clen-1);
-
-                                       // Local Column indices (indexes) 
within a matrix block  
-                                       long begColMat = 
UtilFunctions.computeCellInBlock(lColId+1, _bclenMatrix);
-                                       long endColMat = 
UtilFunctions.computeCellInBlock(endCol+1, _bclenMatrix);
-
-                                       FrameBlock tmpFrame = new FrameBlock();
-                                       tmpFrame = 
blk.sliceOperations((int)lRowId, (int)(lRowId+endRowMat-begRowMat), 
(int)lColId, (int)endCol, tmpFrame);
-
-                                       MatrixIndexes matrixIndexes = new 
MatrixIndexes(UtilFunctions.computeBlockIndex(begRow+1, 
_brlenMatrix),UtilFunctions.computeBlockIndex(lColId+1, _bclenMatrix));
-
-                                       MatrixBlock matrixBlocktmp = 
DataConverter.convertToMatrixBlock(tmpFrame);
-                                       MatrixBlock matrixBlock = 
matrixBlocktmp.leftIndexingOperations(matrixBlocktmp, (int)begRowMat, 
(int)endRowMat, (int)begColMat, (int)endColMat, new MatrixBlock(), 
UpdateType.INPLACE_PINNED);
-                                       ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(matrixIndexes, matrixBlock));
-                                       
-                                       lColId = endCol+1;
+                       //slice aligned matrix blocks out of given frame block
+                       long rstartix = 
UtilFunctions.computeBlockIndex(rowIndex, brlen);
+                       long rendix = 
UtilFunctions.computeBlockIndex(rowIndex+blk.getNumRows()-1, brlen);
+                       long cendix = 
UtilFunctions.computeBlockIndex(blk.getNumColumns(), bclen);
+                       for( long rix=rstartix; rix<=rendix; rix++ ) { //for 
all row blocks
+                               long rpos = UtilFunctions.computeCellIndex(rix, 
brlen, 0);
+                               int lrlen = 
UtilFunctions.computeBlockSize(rlen, rix, brlen);
+                               int fix = (int)((rpos-rowIndex>=0) ? 
rpos-rowIndex : 0);
+                               int fix2 = 
(int)Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1);
+                               int mix = 
UtilFunctions.computeCellInBlock(rowIndex+fix, brlen);
+                               int mix2 = mix + (fix2-fix);
+                               for( long cix=1; cix<=cendix; cix++ ) { //for 
all column blocks
+                                       long cpos = 
UtilFunctions.computeCellIndex(cix, bclen, 0);
+                                       int lclen = 
UtilFunctions.computeBlockSize(clen, cix, bclen);
+                                       MatrixBlock matrix = new 
MatrixBlock(lrlen, lclen, false);
+                                       FrameBlock frame = 
blk.sliceOperations(fix, fix2, 
+                                                       (int)cpos-1, 
(int)cpos+lclen-2, new FrameBlock());
+                                       MatrixBlock mframe = 
DataConverter.convertToMatrixBlock(frame);
+                                       ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(new MatrixIndexes(rix, cix), 
+                                                       
matrix.leftIndexingOperations(mframe, mix, mix2, 0, lclen-1, 
+                                                       new MatrixBlock(), 
UpdateType.INPLACE_PINNED)));
                                }
-                               lRowId += (endRow-begRow+1);
                        }
-                       
+
                        return ret;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 97ac27c..d0a7f88 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -39,6 +38,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.io.FrameWriter;
@@ -191,9 +191,6 @@ public class FrameConverterTest extends AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
-               SparkConf conf = new 
SparkConf().setAppName("Frame").setMaster("local");
-               conf.set("spark.kryo.classesToRegister", 
"org.apache.hadoop.io.LongWritable");
-
                try
                {
                        TestConfiguration config = 
getTestConfiguration(TEST_NAME);
@@ -461,7 +458,8 @@ public class FrameConverterTest extends AutomatedTestBase
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                JavaPairRDD<LongWritable,Text> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .textCellToBinaryBlock(sc, 
rddIn, mc, schema);
+                                               .textCellToBinaryBlock(sc, 
rddIn, mc, schema)
+                                               .mapToPair(new 
LongFrameBlockToLongWritableFrameBlock());
                                rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
                                break;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
new file mode 100644
index 0000000..7da887e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.io.IOException;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.io.FrameWriter;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class FrameMatrixReblockTest extends AutomatedTestBase
+{
+       private final static String TEST_DIR = "functions/frame/";
+       private final static String TEST_NAME1 = "FrameMatrixReblock";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameMatrixReblockTest.class.getSimpleName() + "/";
+
+       private final static int rows = 2593;
+       private final static int cols1 = 372;
+       private final static int cols2 = 1102;
+       private final static double sparsity1 = 0.9;
+       private final static double sparsity2 = 0.3;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"B"}));
+       }
+       
+       @Test
+       public void testFrameWriteSingleDenseBinaryCP() {
+               runFrameReblockTest(TEST_NAME1, false, false, "binary", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleDenseTextcellCP() {
+               runFrameReblockTest(TEST_NAME1, false, false, "text", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleDenseCsvCP() {
+               runFrameReblockTest(TEST_NAME1, false, false, "csv", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleDenseBinaryCP() {
+               runFrameReblockTest(TEST_NAME1, true,  false, "binary", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleDenseTextcellCP() {
+               runFrameReblockTest(TEST_NAME1, true, false, "text", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleDenseCsvCP() {
+               runFrameReblockTest(TEST_NAME1, true, false, "csv", 
ExecType.CP);
+       }
+
+       @Test
+       public void testFrameWriteSingleDenseBinarySpark() {
+               runFrameReblockTest(TEST_NAME1, false, false, "binary", 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteSingleDenseTextcellSpark() {
+               runFrameReblockTest(TEST_NAME1, false, false, "text", 
ExecType.SPARK);
+       }
+
+//TODO enable csv spark tests once transform over frame supported      
+//     @Test
+//     public void testFrameWriteSingleDenseCsvSpark() {
+//             runFrameReblockTest(TEST_NAME1, false, false, "csv", 
ExecType.SPARK);
+//     }
+       
+       @Test
+       public void testFrameWriteMultipleDenseBinarySpark() {
+               runFrameReblockTest(TEST_NAME1, true, false, "binary", 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleDenseTextcellSpark() {
+               runFrameReblockTest(TEST_NAME1, true, false, "text", 
ExecType.SPARK);
+       }
+       
+//     @Test
+//     public void testFrameWriteMultipleDenseCsvSpark() {
+//             runFrameReblockTest(TEST_NAME1, true, false, "csv", 
ExecType.SPARK);
+//     }
+       
+       @Test
+       public void testFrameWriteSingleSparseBinaryCP() {
+               runFrameReblockTest(TEST_NAME1, false, true, "binary", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleSparseTextcellCP() {
+               runFrameReblockTest(TEST_NAME1, false, true, "text", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleSparseCsvCP() {
+               runFrameReblockTest(TEST_NAME1, false, true, "csv", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleSparseBinaryCP() {
+               runFrameReblockTest(TEST_NAME1, true, true, "binary", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleSparseTextcellCP() {
+               runFrameReblockTest(TEST_NAME1, true, true, "text", 
ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleSparseCsvCP() {
+               runFrameReblockTest(TEST_NAME1, true, true, "csv", ExecType.CP);
+       }
+
+       @Test
+       public void testFrameWriteSingleSparseBinarySpark() {
+               runFrameReblockTest(TEST_NAME1, false, true, "binary", 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteSingleSparseTextcellSpark() {
+               runFrameReblockTest(TEST_NAME1, false, true, "text", 
ExecType.SPARK);
+       }
+       
+//     @Test
+//     public void testFrameWriteSingleSparseCsvSpark() {
+//             runFrameReblockTest(TEST_NAME1, false, true, "csv", 
ExecType.SPARK);
+//     }
+       
+       @Test
+       public void testFrameWriteMultipleSparseBinarySpark() {
+               runFrameReblockTest(TEST_NAME1, true, true, "binary", 
ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleSparseTextcellSpark() {
+               runFrameReblockTest(TEST_NAME1, true, true, "text", 
ExecType.SPARK);
+       }
+       
+//     @Test
+//     public void testFrameWriteMultipleSparseCsvSpark() {
+//             runFrameReblockTest(TEST_NAME1, true, true, "csv", 
ExecType.SPARK);
+//     }
+       
+       /**
+        * 
+        * @param testname
+        * @param multColBlks
+        * @param ofmt
+        * @param et
+        */
+       private void runFrameReblockTest( String testname, boolean multColBlks, 
boolean sparse, String ofmt, ExecType et)
+       {
+               //rtplatform for MR
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+               }
+       
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try
+               {
+                       int cols = multColBlks ? cols2 : cols1;
+                       double sparsity = sparse ? sparsity2 : sparsity1;
+                       
+                       TestConfiguration config = 
getTestConfiguration(testname);
+                       loadTestConfiguration(config);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + testname + ".dml";
+                       programArgs = new String[]{"-explain","-args", 
input("A"), String.valueOf(rows), 
+                                       String.valueOf(cols), output("B"), ofmt 
};
+                       
+                       //generate input data
+                       double[][] A = getRandomMatrix(rows, cols, -1, 1, 
sparsity, 7);
+                       writeFrameInput(input("A"), ofmt, A, rows, cols);
+                       
+                       //run testcase
+                       runTest(true, false, null, -1);
+                       
+                       //compare matrices
+                       double[][] B = readMatrixOutput(output("B"), ofmt, 
rows, cols);
+                       TestUtils.compareMatrices(A, B, rows, cols, 0);
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+       
+       /**
+        * 
+        * @param fname
+        * @param ofmt
+        * @param frame
+        * @param rows
+        * @param cols
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       private void writeFrameInput(String fname, String ofmt, double[][] 
frame, int rows, int cols) 
+               throws DMLRuntimeException, IOException 
+       {
+               MatrixBlock mb = DataConverter.convertToMatrixBlock(frame);
+               FrameBlock fb = DataConverter.convertToFrameBlock(mb);
+               
+               //write input data
+               FrameWriter writer = FrameWriterFactory.createFrameWriter(
+                               
InputInfo.getMatchingOutputInfo(InputInfo.stringExternalToInputInfo(ofmt)));
+               writer.writeFrameToHDFS(fb, fname, rows, cols);
+       }
+       
+       /**
+        * 
+        * @param fname
+        * @param rows
+        * @param cols
+        * @param ofmt
+        * @return
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       private double[][] readMatrixOutput(String fname, String ofmt, int 
rows, int cols) 
+               throws DMLRuntimeException, IOException 
+       {
+               MatrixReader reader = 
MatrixReaderFactory.createMatrixReader(InputInfo.stringExternalToInputInfo(ofmt));
+               MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, cols, 
1000, 1000, -1);
+               
+               return DataConverter.convertToDoubleMatrix(mb); 
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java
index 9724d1d..bd821b1 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java
@@ -21,14 +21,12 @@ package org.apache.sysml.test.integration.functions.misc;
 
 import java.util.HashMap;
 
-import org.junit.Assert;
 import org.junit.Test;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
-import org.apache.sysml.utils.Statistics;
 
 /**
  * Regression test for function recompile-once issue with literal replacement.
@@ -36,57 +34,44 @@ import org.apache.sysml.utils.Statistics;
  */
 public class RewritePushdownSumBinaryMult extends AutomatedTestBase 
 {
-       
        private static final String TEST_NAME1 = "RewritePushdownSumBinaryMult";
        private static final String TEST_NAME2 = 
"RewritePushdownSumBinaryMult2";
 
        private static final String TEST_DIR = "functions/misc/";
        private static final String TEST_CLASS_DIR = TEST_DIR + 
RewritePushdownSumBinaryMult.class.getSimpleName() + "/";
        
-       //private static final int rows = 1234;
-       //private static final int cols = 567;
-       private static final double eps = Math.pow(10, -10);
-       
        @Override
-       public void setUp() 
-       {
+       public void setUp() {
                TestUtils.clearAssertionInformation();
                addTestConfiguration( TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
                addTestConfiguration( TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) );
        }
        
        @Test
-       public void testPushdownSumBinaryMultNoRewrite() 
-       {
+       public void testPushdownSumBinaryMultNoRewrite() {
                testRewritePushdownSumBinaryMult( TEST_NAME1, false );
        }
        
        
        @Test
-       public void testPushdownSumBinaryMultRewrite() 
-       {
+       public void testPushdownSumBinaryMultRewrite() {
                testRewritePushdownSumBinaryMult( TEST_NAME1, true );
        }
        
-       
        @Test
-       public void testPushdownSumBinaryMultNoRewrite2() 
-       {
+       public void testPushdownSumBinaryMultNoRewrite2() {
                testRewritePushdownSumBinaryMult( TEST_NAME2, false );
        }
        
        @Test
-       public void testPushdownSumBinaryMultRewrite2() 
-       {
+       public void testPushdownSumBinaryMultRewrite2() {
                testRewritePushdownSumBinaryMult( TEST_NAME2, true );
        }
        
-       
        /**
         * 
-        * @param condition
-        * @param branchRemoval
-        * @param IPA
+        * @param testname
+        * @param rewrites
         */
        private void testRewritePushdownSumBinaryMult( String testname, boolean 
rewrites )
        {       
@@ -97,7 +82,6 @@ public class RewritePushdownSumBinaryMult extends 
AutomatedTestBase
                        TestConfiguration config = 
getTestConfiguration(testname);
                        loadTestConfiguration(config);
                        
-                       
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + testname + ".dml";
                        programArgs = new String[]{ "-stats","-args", 
output("Scalar") };
@@ -114,13 +98,9 @@ public class RewritePushdownSumBinaryMult extends 
AutomatedTestBase
                        HashMap<CellIndex, Double> dmlfile = 
readDMLScalarFromHDFS("Scalar");
                        HashMap<CellIndex, Double> rfile  = 
readRScalarFromFS("Scalar");
                        TestUtils.compareScalars(dmlfile.toString(), 
rfile.toString());
-                       System.out.println("Test case passed");
-                       
                }
-               finally
-               {
+               finally {
                        OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag;
-               }
-               
+               }       
        }       
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/scripts/functions/frame/FrameMatrixReblock.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameMatrixReblock.dml 
b/src/test/scripts/functions/frame/FrameMatrixReblock.dml
new file mode 100644
index 0000000..67eee82
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameMatrixReblock.dml
@@ -0,0 +1,25 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($1, rows=$2, cols=$3, data_type="frame", schema="double", format=$5);
+
+B = as.matrix(A);
+write(B, $4, format=$5);


Reply via email to