Repository: incubator-systemml Updated Branches: refs/heads/master 919d919be -> 9451a0fd8
[SYSTEMML-562] Spark frame reblock instruction (text), tests This patch generalizes the existing reblock instruction for frames. In order to stay compatible with the file-based transform, this does not include CSV yet. Furthermore, it also includes a fix of the distributed frame to matrix converter to deal with unaligned blocks. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9451a0fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9451a0fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9451a0fd Branch: refs/heads/master Commit: 9451a0fd89bcf02cc9ba572e8092b9f1447c4d86 Parents: 919d919 Author: Matthias Boehm <[email protected]> Authored: Fri Jun 10 00:59:26 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jun 10 12:45:17 2016 -0700 ---------------------------------------------------------------------- .../apache/sysml/hops/recompile/Recompiler.java | 36 ++- .../RewriteAlgebraicSimplificationStatic.java | 2 +- .../rewrite/RewriteBlockSizeAndReblock.java | 7 +- .../controlprogram/caching/FrameObject.java | 4 + .../context/SparkExecutionContext.java | 26 +- .../spark/CSVReblockSPInstruction.java | 2 +- .../instructions/spark/CastSPInstruction.java | 5 +- .../spark/ReblockSPInstruction.java | 97 +++++-- .../spark/utils/FrameRDDConverterUtils.java | 90 +++--- .../functions/frame/FrameConverterTest.java | 8 +- .../functions/frame/FrameMatrixReblockTest.java | 277 +++++++++++++++++++ .../misc/RewritePushdownSumBinaryMult.java | 38 +-- .../functions/frame/FrameMatrixReblock.dml | 25 ++ 13 files changed, 480 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index 9daefdf..ec89775 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -78,6 +78,8 @@ import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; import org.apache.sysml.runtime.controlprogram.ProgramBlock; import org.apache.sysml.runtime.controlprogram.WhileProgramBlock; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; +import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; @@ -94,6 +96,7 @@ import org.apache.sysml.runtime.instructions.mr.RandInstruction; import org.apache.sysml.runtime.instructions.mr.SeqInstruction; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; +import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.MapReduceTool; @@ -1907,8 +1910,8 @@ public class Recompiler public static boolean checkCPReblock(ExecutionContext ec, String varin) throws DMLRuntimeException { - MatrixObject in = ec.getMatrixObject(varin); - MatrixCharacteristics mc = in.getMatrixCharacteristics(); + CacheableData<?> obj = ec.getCacheableData(varin); + MatrixCharacteristics mc = ec.getMatrixCharacteristics(varin); long rows = mc.getRows(); long cols = mc.getCols(); @@ -1924,8 +1927,8 @@ public class Recompiler //robustness for usage through mlcontext (key/values of input rdds are //not serializable for text; also bufferpool rdd read only supported for // binarycell and binaryblock) - MatrixFormatMetaData iimd = (MatrixFormatMetaData) in.getMetaData(); - if( in.getRDDHandle() != null + MatrixFormatMetaData iimd = (MatrixFormatMetaData) obj.getMetaData(); + if( obj.getRDDHandle() != null && iimd.getInputInfo() != InputInfo.BinaryBlockInputInfo && iimd.getInputInfo() != InputInfo.BinaryCellInputInfo ) { return false; @@ -2061,7 +2064,7 @@ public class Recompiler * @param out * @throws DMLRuntimeException */ - public static void executeInMemoryReblock(ExecutionContext ec, String varin, String varout) + public static void executeInMemoryMatrixReblock(ExecutionContext ec, String varin, String varout) throws DMLRuntimeException { MatrixObject in = ec.getMatrixObject(varin); @@ -2079,6 +2082,29 @@ public class Recompiler /** * + * @param ec + * @param varin + * @param varout + * @throws DMLRuntimeException + */ + public static void executeInMemoryFrameReblock(ExecutionContext ec, String varin, String varout) + throws DMLRuntimeException + { + FrameObject in = ec.getFrameObject(varin); + FrameObject out = ec.getFrameObject(varout); + + //read text input frame (through buffer pool, frame object carries all relevant + //information including additional arguments for csv reblock) + FrameBlock fb = in.acquireRead(); + + //set output (incl update matrix characteristics) + out.acquireModify( fb ); + out.release(); + in.release(); + } + + /** + * * @param fname * @return * @throws DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java index c36c01f..e903a03 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java @@ -145,7 +145,7 @@ public class RewriteAlgebraicSimplificationStatic extends HopRewriteRule hi = simplifyBushyBinaryOperation(hop, hi, i); //e.g., (X*(Y*(Z%*%v))) -> (X*Y)*(Z%*%v) hi = simplifyUnaryAggReorgOperation(hop, hi, i); //e.g., sum(t(X)) -> sum(X) hi = pushdownUnaryAggTransposeOperation(hop, hi, i); //e.g., colSums(t(X)) -> t(rowSums(X)) - hi = pushdownSumBinaryMult(hop, hi, i); //e.g., sum(lamda*X) -> lamda*sum(X) + hi = pushdownSumBinaryMult(hop, hi, i); //e.g., sum(lamda*X) -> lamda*sum(X) hi = simplifyUnaryPPredOperation(hop, hi, i); //e.g., abs(ppred()) -> ppred(), others: round, ceil, floor hi = simplifyTransposedAppend(hop, hi, i); //e.g., t(cbind(t(A),t(B))) -> rbind(A,B); hi = fuseBinarySubDAGToUnaryOperation(hop, hi, i); //e.g., X*(1-X)-> sprop(X) || 1/(1+exp(-X)) -> sigmoid(X) || X*(X>0) -> selp(X) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java index ce09a6f..92dcd69 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java @@ -27,6 +27,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.DataOp; import org.apache.sysml.hops.FunctionOp; import org.apache.sysml.hops.Hop; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.hops.Hop.DataOpTypes; import org.apache.sysml.hops.Hop.FileFormatTypes; import org.apache.sysml.hops.Hop.ParamBuiltinOp; @@ -92,8 +93,9 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule if (hop instanceof DataOp) { // if block size does not match - if( canReblock && hop.getDataType() == DataType.MATRIX - && (hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || hop.getColsInBlock() != GLOBAL_BLOCKSIZE) ) + if( canReblock //TODO change frame condition to != BINARY once transform over frames supported + && ((hop.getDataType() == DataType.MATRIX && (hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || hop.getColsInBlock() != GLOBAL_BLOCKSIZE) + ||(hop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode() && ((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT)))) { if (((DataOp) hop).getDataOpType() == DataOp.DataOpTypes.PERSISTENTREAD) { @@ -141,6 +143,7 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule } } } + //TODO remove once transform rebased to frames else if ( (hop instanceof ParameterizedBuiltinOp && ((ParameterizedBuiltinOp)hop).getOp() == ParamBuiltinOp.TRANSFORM) ) { // check if there exists a non-csv-write output. If yes, add reblock http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index 84a74dd..b850301 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -86,6 +86,10 @@ public class FrameObject extends CacheableData<FrameBlock> public FrameObject(FrameObject fo) { super(fo); } + + public List<ValueType> getSchema() { + return _schema; + } public void setSchema(String schema) { if( schema.equals("*") ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 020c49f..1516286 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -47,6 +47,7 @@ import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.Program; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -532,11 +533,8 @@ public class SparkExecutionContext extends ExecutionContext } /** - * Keep the output rdd of spark rdd operations as meta data of matrix objects in the - * symbol table. - * - * Spark instructions should call this for all matrix outputs. - * + * Keep the output rdd of spark rdd operations as meta data of matrix/frame + * objects in the symbol table. * * @param varname * @param rdd @@ -545,23 +543,9 @@ public class SparkExecutionContext extends ExecutionContext public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) throws DMLRuntimeException { - MatrixObject mo = getMatrixObject(varname); - RDDObject rddhandle = new RDDObject(rdd, varname); - mo.setRDDHandle( rddhandle ); - } - - /** - * - * @param varname - * @param rdd - * @throws DMLRuntimeException - */ - public void setFrameRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) - throws DMLRuntimeException - { - FrameObject mo = getFrameObject(varname); + CacheableData<?> obj = getCacheableData(varname); RDDObject rddhandle = new RDDObject(rdd, varname); - mo.setRDDHandle( rddhandle ); + obj.setRDDHandle( rddhandle ); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java index 6aa7597..98cc5a0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java @@ -107,7 +107,7 @@ public class CSVReblockSPInstruction extends UnarySPInstruction //check for in-memory reblock (w/ lazy spark context, potential for latency reduction) if( Recompiler.checkCPReblock(sec, input1.getName()) ) { - Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName()); + Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); return; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java index 8160bd0..3562d18 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CastSPInstruction.java @@ -79,20 +79,17 @@ public class CastSPInstruction extends UnarySPInstruction in = ((JavaPairRDD<Long, FrameBlock>)in).mapToPair(new LongFrameToLongWritableFrameFunction()); out = FrameRDDConverterUtils.binaryBlockToMatrixBlock( (JavaPairRDD<LongWritable, FrameBlock>)in, mcIn, mcOut); - - sec.setRDDHandleForVariable(output.getName(), out); } else if( opcode.equals(UnaryCP.CAST_AS_FRAME_OPCODE) ) { out = FrameRDDConverterUtils.matrixBlockToBinaryBlockLongIndex(sec.getSparkContext(), (JavaPairRDD<MatrixIndexes, MatrixBlock>)in, mcIn); - - sec.setFrameRDDHandleForVariable(output.getName(), out); } else { throw new DMLRuntimeException("Unsupported spark cast operation: "+opcode); } //update output statistics and add lineage + sec.setRDDHandleForVariable(output.getName(), out); updateUnaryOutputMatrixCharacteristics(sec, input1.getName(), output.getName()); sec.addLineageRDD(output.getName(), input1.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java index 000bd63..d0128f9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java @@ -23,18 +23,23 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.sysml.hops.recompile.Recompiler; +import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; +import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBinaryReblock; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; @@ -77,38 +82,61 @@ public class ReblockSPInstruction extends UnarySPInstruction @Override - @SuppressWarnings("unchecked") public void processInstruction(ExecutionContext ec) throws DMLRuntimeException { SparkExecutionContext sec = (SparkExecutionContext)ec; //set the output characteristics - MatrixObject mo = sec.getMatrixObject(input1.getName()); + CacheableData<?> obj = sec.getCacheableData(input1.getName()); MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); mcOut.set(mc.getRows(), mc.getCols(), brlen, bclen, mc.getNonZeros()); - + //get the source format form the meta data - MatrixFormatMetaData iimd = (MatrixFormatMetaData) mo.getMetaData(); - if(iimd == null) { + MatrixFormatMetaData iimd = (MatrixFormatMetaData) obj.getMetaData(); + if(iimd == null) throw new DMLRuntimeException("Error: Metadata not found"); - } - + InputInfo iinfo = iimd.getInputInfo(); + //check for in-memory reblock (w/ lazy spark context, potential for latency reduction) if( Recompiler.checkCPReblock(sec, input1.getName()) ) { - Recompiler.executeInMemoryReblock(sec, input1.getName(), output.getName()); + if( input1.getDataType() == DataType.MATRIX ) + Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); + else if( input1.getDataType() == DataType.FRAME ) + Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName()); return; } - if(iimd.getInputInfo() == InputInfo.TextCellInputInfo || iimd.getInputInfo() == InputInfo.MatrixMarketInputInfo ) + //execute matrix/frame reblock + if( input1.getDataType() == DataType.MATRIX ) + processMatrixReblockInstruction(sec, iinfo); + else if( input1.getDataType() == DataType.FRAME ) + processFrameReblockInstruction(sec, iinfo); + } + + /** + * + * @param sec + * @param iinfo + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + protected void processMatrixReblockInstruction(SparkExecutionContext sec, InputInfo iinfo) + throws DMLRuntimeException + { + MatrixObject mo = sec.getMatrixObject(input1.getName()); + MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + + if(iinfo == InputInfo.TextCellInputInfo || iinfo == InputInfo.MatrixMarketInputInfo ) { //check jdk version (prevent double.parseDouble contention on <jdk8) sec.checkAndRaiseValidationWarningJDKVersion(); //get the input textcell rdd JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iimd.getInputInfo()); + sec.getRDDHandleForVariable(input1.getName(), iinfo); //convert textcell to binary block JavaPairRDD<MatrixIndexes, MatrixBlock> out = @@ -118,7 +146,7 @@ public class ReblockSPInstruction extends UnarySPInstruction sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); } - else if(iimd.getInputInfo() == InputInfo.CSVInputInfo) { + else if(iinfo == InputInfo.CSVInputInfo) { // HACK ALERT: Until we introduces the rewrite to insert csvrblock for non-persistent read // throw new DMLRuntimeException("CSVInputInfo is not supported for ReblockSPInstruction"); CSVReblockSPInstruction csvInstruction = null; @@ -140,16 +168,16 @@ public class ReblockSPInstruction extends UnarySPInstruction csvInstruction.processInstruction(sec); return; } - else if(iimd.getInputInfo()==InputInfo.BinaryCellInputInfo) + else if(iinfo == InputInfo.BinaryCellInputInfo) { - JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = (JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForVariable(input1.getName(), iimd.getInputInfo()); + JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = (JavaPairRDD<MatrixIndexes, MatrixCell>) sec.getRDDHandleForVariable(input1.getName(), iinfo); JavaPairRDD<MatrixIndexes, MatrixBlock> out = RDDConverterUtils.binaryCellToBinaryBlock(sec.getSparkContext(), binaryCells, mcOut, outputEmptyBlocks); //put output RDD handle into symbol table sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); } - else if(iimd.getInputInfo()==InputInfo.BinaryBlockInputInfo) + else if(iinfo == InputInfo.BinaryBlockInputInfo) { /// HACK ALERT: Workaround for MLContext if(mc.getRowsPerBlock() == mcOut.getRowsPerBlock() && mc.getColsPerBlock() == mcOut.getColsPerBlock()) { @@ -162,7 +190,7 @@ public class ReblockSPInstruction extends UnarySPInstruction return; } else { - throw new DMLRuntimeException("Input RDD is not accessible through buffer pool for ReblockSPInstruction:" + iimd.getInputInfo()); + throw new DMLRuntimeException("Input RDD is not accessible through buffer pool for ReblockSPInstruction:" + iinfo); } } else @@ -180,7 +208,42 @@ public class ReblockSPInstruction extends UnarySPInstruction } } else { - throw new DMLRuntimeException("The given InputInfo is not implemented for ReblockSPInstruction:" + iimd.getInputInfo()); - } + throw new DMLRuntimeException("The given InputInfo is not implemented for ReblockSPInstruction:" + iinfo); + } + } + + /** + * + * @param sec + * @param iinfo + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + protected void processFrameReblockInstruction(SparkExecutionContext sec, InputInfo iinfo) + throws DMLRuntimeException + { + FrameObject fo = sec.getFrameObject(input1.getName()); + MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName()); + + if(iinfo == InputInfo.TextCellInputInfo ) + { + //check jdk version (prevent double.parseDouble contention on <jdk8) + sec.checkAndRaiseValidationWarningJDKVersion(); + + //get the input textcell rdd + JavaPairRDD<LongWritable, Text> lines = (JavaPairRDD<LongWritable, Text>) + sec.getRDDHandleForVariable(input1.getName(), iinfo); + + //convert textcell to binary block + JavaPairRDD<Long, FrameBlock> out = + FrameRDDConverterUtils.textCellToBinaryBlock(sec.getSparkContext(), lines, mcOut, fo.getSchema()); + + //put output RDD handle into symbol table + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + } + else { + throw new DMLRuntimeException("The given InputInfo is not implemented for ReblockSPInstruction:" + iinfo); + } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 003b016..7c8c08b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.instructions.spark.utils; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -161,23 +162,20 @@ public class FrameRDDConverterUtils * @return * @throws DMLRuntimeException */ - public static JavaPairRDD<LongWritable, FrameBlock> textCellToBinaryBlock(JavaSparkContext sc, + public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlock(JavaSparkContext sc, JavaPairRDD<LongWritable, Text> in, MatrixCharacteristics mcOut, List<ValueType> schema ) throws DMLRuntimeException { + //replicate schema entry if necessary + List<ValueType> lschema = (schema.size()==1 && mcOut.getCols()>1) ? + Collections.nCopies((int)mcOut.getCols(), schema.get(0)) : schema; //convert input rdd to serializable long/frame block JavaPairRDD<Long,Text> input = in.mapToPair(new LongWritableTextToLongTextFunction()); - //Do actual conversion - JavaPairRDD<Long,FrameBlock> output = textCellToBinaryBlockLongIndex(sc, input, mcOut, schema); - - //convert input rdd to serializable long/frame block - JavaPairRDD<LongWritable,FrameBlock> out = - output.mapToPair(new LongFrameToLongWritableFrameFunction()); - - return out; + //do actual conversion + return textCellToBinaryBlockLongIndex(sc, input, mcOut, lschema); } @@ -736,13 +734,12 @@ public class FrameRDDConverterUtils { private static final long serialVersionUID = -2654986510471835933L; - MatrixCharacteristics _mcIn, _mcOut; + private MatrixCharacteristics _mcIn; + private MatrixCharacteristics _mcOut; - public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics mcIn, - MatrixCharacteristics mcOut) { - - _mcIn = mcIn; //Frame Characteristics - _mcOut = mcOut; //Matrix Characteristics + public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics mcIn, MatrixCharacteristics mcOut) { + _mcIn = mcIn; //Frame Characteristics + _mcOut = mcOut; //Matrix Characteristics } @Override @@ -753,46 +750,35 @@ public class FrameRDDConverterUtils FrameBlock blk = arg0._2(); ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); + long rlen = _mcIn.getRows(); + long clen = _mcIn.getCols(); + int brlen = _mcOut.getRowsPerBlock(); + int bclen = _mcOut.getColsPerBlock(); - int _brlenMatrix = _mcOut.getRowsPerBlock(); - int _bclenMatrix = _mcOut.getColsPerBlock(); - long _rlen = _mcIn.getRows(); - long _clen = _mcIn.getCols(); - - long lRowId = 0; - while (lRowId < blk.getNumRows()) { - // Global Row indices (indexes) across all frame blocks - long endRow = ((rowIndex+lRowId-1)/_brlenMatrix+1) * _brlenMatrix; - long begRow = Math.max(endRow-_brlenMatrix+1, 0); - endRow = Math.min(endRow, _rlen); - - // Local Row indices (indexes) within a matrix block - long begRowMat = UtilFunctions.computeCellInBlock(begRow, _brlenMatrix); - long endRowMat = UtilFunctions.computeCellInBlock(endRow, _brlenMatrix); - - long lColId = 0; - while (lColId < blk.getNumColumns()) { - // Global Column index across all frame blocks - long endCol = Math.min(lColId+_bclenMatrix-1, _clen-1); - - // Local Column indices (indexes) within a matrix block - long begColMat = UtilFunctions.computeCellInBlock(lColId+1, _bclenMatrix); - long endColMat = UtilFunctions.computeCellInBlock(endCol+1, _bclenMatrix); - - FrameBlock tmpFrame = new FrameBlock(); - tmpFrame = blk.sliceOperations((int)lRowId, (int)(lRowId+endRowMat-begRowMat), (int)lColId, (int)endCol, tmpFrame); - - MatrixIndexes matrixIndexes = new MatrixIndexes(UtilFunctions.computeBlockIndex(begRow+1, _brlenMatrix),UtilFunctions.computeBlockIndex(lColId+1, _bclenMatrix)); - - MatrixBlock matrixBlocktmp = DataConverter.convertToMatrixBlock(tmpFrame); - MatrixBlock matrixBlock = matrixBlocktmp.leftIndexingOperations(matrixBlocktmp, (int)begRowMat, (int)endRowMat, (int)begColMat, (int)endColMat, new MatrixBlock(), UpdateType.INPLACE_PINNED); - ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(matrixIndexes, matrixBlock)); - - lColId = endCol+1; + //slice aligned matrix blocks out of given frame block + long rstartix = UtilFunctions.computeBlockIndex(rowIndex, brlen); + long rendix = UtilFunctions.computeBlockIndex(rowIndex+blk.getNumRows()-1, brlen); + long cendix = UtilFunctions.computeBlockIndex(blk.getNumColumns(), bclen); + for( long rix=rstartix; rix<=rendix; rix++ ) { //for all row blocks + long rpos = UtilFunctions.computeCellIndex(rix, brlen, 0); + int lrlen = UtilFunctions.computeBlockSize(rlen, rix, brlen); + int fix = (int)((rpos-rowIndex>=0) ? rpos-rowIndex : 0); + int fix2 = (int)Math.min(rpos+lrlen-rowIndex-1,blk.getNumRows()-1); + int mix = UtilFunctions.computeCellInBlock(rowIndex+fix, brlen); + int mix2 = mix + (fix2-fix); + for( long cix=1; cix<=cendix; cix++ ) { //for all column blocks + long cpos = UtilFunctions.computeCellIndex(cix, bclen, 0); + int lclen = UtilFunctions.computeBlockSize(clen, cix, bclen); + MatrixBlock matrix = new MatrixBlock(lrlen, lclen, false); + FrameBlock frame = blk.sliceOperations(fix, fix2, + (int)cpos-1, (int)cpos+lclen-2, new FrameBlock()); + MatrixBlock mframe = DataConverter.convertToMatrixBlock(frame); + ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(rix, cix), + matrix.leftIndexingOperations(mframe, mix, mix2, 0, lclen-1, + new MatrixBlock(), UpdateType.INPLACE_PINNED))); } - lRowId += (endRow-begRow+1); } - + return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index 97ac27c..d0a7f88 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -27,7 +27,6 @@ import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -39,6 +38,7 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.io.FrameWriter; @@ -191,9 +191,6 @@ public class FrameConverterTest extends AutomatedTestBase boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; DMLScript.USE_LOCAL_SPARK_CONFIG = true; - SparkConf conf = new SparkConf().setAppName("Frame").setMaster("local"); - conf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable"); - try { TestConfiguration config = getTestConfiguration(TEST_NAME); @@ -461,7 +458,8 @@ public class FrameConverterTest extends AutomatedTestBase OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .textCellToBinaryBlock(sc, rddIn, mc, schema); + .textCellToBinaryBlock(sc, rddIn, mc, schema) + .mapToPair(new LongFrameBlockToLongWritableFrameBlock()); rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); break; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java new file mode 100644 index 0000000..7da887e --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.frame; + +import java.io.IOException; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.FrameWriter; +import org.apache.sysml.runtime.io.FrameWriterFactory; +import org.apache.sysml.runtime.io.MatrixReader; +import org.apache.sysml.runtime.io.MatrixReaderFactory; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; + +/** + * + */ +public class FrameMatrixReblockTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_NAME1 = "FrameMatrixReblock"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameMatrixReblockTest.class.getSimpleName() + "/"; + + private final static int rows = 2593; + private final static int cols1 = 372; + private final static int cols2 = 1102; + private final static double sparsity1 = 0.9; + private final static double sparsity2 = 0.3; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"B"})); + } + + @Test + public void testFrameWriteSingleDenseBinaryCP() { + runFrameReblockTest(TEST_NAME1, false, false, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteSingleDenseTextcellCP() { + runFrameReblockTest(TEST_NAME1, false, false, "text", ExecType.CP); + } + + @Test + public void testFrameWriteSingleDenseCsvCP() { + runFrameReblockTest(TEST_NAME1, false, false, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleDenseBinaryCP() { + runFrameReblockTest(TEST_NAME1, true, false, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleDenseTextcellCP() { + runFrameReblockTest(TEST_NAME1, true, false, "text", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleDenseCsvCP() { + runFrameReblockTest(TEST_NAME1, true, false, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteSingleDenseBinarySpark() { + runFrameReblockTest(TEST_NAME1, false, false, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteSingleDenseTextcellSpark() { + runFrameReblockTest(TEST_NAME1, false, false, "text", ExecType.SPARK); + } + +//TODO enable csv spark tests once transform over frame supported +// @Test +// public void testFrameWriteSingleDenseCsvSpark() { +// runFrameReblockTest(TEST_NAME1, false, false, "csv", ExecType.SPARK); +// } + + @Test + public void testFrameWriteMultipleDenseBinarySpark() { + runFrameReblockTest(TEST_NAME1, true, false, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteMultipleDenseTextcellSpark() { + runFrameReblockTest(TEST_NAME1, true, false, "text", ExecType.SPARK); + } + +// @Test +// public void testFrameWriteMultipleDenseCsvSpark() { +// runFrameReblockTest(TEST_NAME1, true, false, "csv", ExecType.SPARK); +// } + + @Test + public void testFrameWriteSingleSparseBinaryCP() { + runFrameReblockTest(TEST_NAME1, false, true, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteSingleSparseTextcellCP() { + runFrameReblockTest(TEST_NAME1, false, true, "text", ExecType.CP); + } + + @Test + public void testFrameWriteSingleSparseCsvCP() { + runFrameReblockTest(TEST_NAME1, false, true, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleSparseBinaryCP() { + runFrameReblockTest(TEST_NAME1, true, true, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleSparseTextcellCP() { + runFrameReblockTest(TEST_NAME1, true, true, "text", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleSparseCsvCP() { + runFrameReblockTest(TEST_NAME1, true, true, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteSingleSparseBinarySpark() { + runFrameReblockTest(TEST_NAME1, false, true, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteSingleSparseTextcellSpark() { + runFrameReblockTest(TEST_NAME1, false, true, "text", ExecType.SPARK); + } + +// @Test +// public void testFrameWriteSingleSparseCsvSpark() { +// runFrameReblockTest(TEST_NAME1, false, true, "csv", ExecType.SPARK); +// } + + @Test + public void testFrameWriteMultipleSparseBinarySpark() { + runFrameReblockTest(TEST_NAME1, true, true, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteMultipleSparseTextcellSpark() { + runFrameReblockTest(TEST_NAME1, true, true, "text", ExecType.SPARK); + } + +// @Test +// public void testFrameWriteMultipleSparseCsvSpark() { +// runFrameReblockTest(TEST_NAME1, true, true, "csv", ExecType.SPARK); +// } + + /** + * + * @param testname + * @param multColBlks + * @param ofmt + * @param et + */ + private void runFrameReblockTest( String testname, boolean multColBlks, boolean sparse, String ofmt, ExecType et) + { + //rtplatform for MR + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try + { + int cols = multColBlks ? cols2 : cols1; + double sparsity = sparse ? sparsity2 : sparsity1; + + TestConfiguration config = getTestConfiguration(testname); + loadTestConfiguration(config); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + testname + ".dml"; + programArgs = new String[]{"-explain","-args", input("A"), String.valueOf(rows), + String.valueOf(cols), output("B"), ofmt }; + + //generate input data + double[][] A = getRandomMatrix(rows, cols, -1, 1, sparsity, 7); + writeFrameInput(input("A"), ofmt, A, rows, cols); + + //run testcase + runTest(true, false, null, -1); + + //compare matrices + double[][] B = readMatrixOutput(output("B"), ofmt, rows, cols); + TestUtils.compareMatrices(A, B, rows, cols, 0); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } + + /** + * + * @param fname + * @param ofmt + * @param frame + * @param rows + * @param cols + * @throws DMLRuntimeException + * @throws IOException + */ + private void writeFrameInput(String fname, String ofmt, double[][] frame, int rows, int cols) + throws DMLRuntimeException, IOException + { + MatrixBlock mb = DataConverter.convertToMatrixBlock(frame); + FrameBlock fb = DataConverter.convertToFrameBlock(mb); + + //write input data + FrameWriter writer = FrameWriterFactory.createFrameWriter( + InputInfo.getMatchingOutputInfo(InputInfo.stringExternalToInputInfo(ofmt))); + writer.writeFrameToHDFS(fb, fname, rows, cols); + } + + /** + * + * @param fname + * @param rows + * @param cols + * @param ofmt + * @return + * @throws DMLRuntimeException + * @throws IOException + */ + private double[][] readMatrixOutput(String fname, String ofmt, int rows, int cols) + throws DMLRuntimeException, IOException + { + MatrixReader reader = MatrixReaderFactory.createMatrixReader(InputInfo.stringExternalToInputInfo(ofmt)); + MatrixBlock mb = reader.readMatrixFromHDFS(fname, rows, cols, 1000, 1000, -1); + + return DataConverter.convertToDoubleMatrix(mb); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java index 9724d1d..bd821b1 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/misc/RewritePushdownSumBinaryMult.java @@ -21,14 +21,12 @@ package org.apache.sysml.test.integration.functions.misc; import java.util.HashMap; -import org.junit.Assert; import org.junit.Test; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; -import org.apache.sysml.utils.Statistics; /** * Regression test for function recompile-once issue with literal replacement. @@ -36,57 +34,44 @@ import org.apache.sysml.utils.Statistics; */ public class RewritePushdownSumBinaryMult extends AutomatedTestBase { - private static final String TEST_NAME1 = "RewritePushdownSumBinaryMult"; private static final String TEST_NAME2 = "RewritePushdownSumBinaryMult2"; private static final String TEST_DIR = "functions/misc/"; private static final String TEST_CLASS_DIR = TEST_DIR + RewritePushdownSumBinaryMult.class.getSimpleName() + "/"; - //private static final int rows = 1234; - //private static final int cols = 567; - private static final double eps = Math.pow(10, -10); - @Override - public void setUp() - { + public void setUp() { TestUtils.clearAssertionInformation(); addTestConfiguration( TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) ); addTestConfiguration( TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) ); } @Test - public void testPushdownSumBinaryMultNoRewrite() - { + public void testPushdownSumBinaryMultNoRewrite() { testRewritePushdownSumBinaryMult( TEST_NAME1, false ); } @Test - public void testPushdownSumBinaryMultRewrite() - { + public void testPushdownSumBinaryMultRewrite() { testRewritePushdownSumBinaryMult( TEST_NAME1, true ); } - @Test - public void testPushdownSumBinaryMultNoRewrite2() - { + public void testPushdownSumBinaryMultNoRewrite2() { testRewritePushdownSumBinaryMult( TEST_NAME2, false ); } @Test - public void testPushdownSumBinaryMultRewrite2() - { + public void testPushdownSumBinaryMultRewrite2() { testRewritePushdownSumBinaryMult( TEST_NAME2, true ); } - /** * - * @param condition - * @param branchRemoval - * @param IPA + * @param testname + * @param rewrites */ private void testRewritePushdownSumBinaryMult( String testname, boolean rewrites ) { @@ -97,7 +82,6 @@ public class RewritePushdownSumBinaryMult extends AutomatedTestBase TestConfiguration config = getTestConfiguration(testname); loadTestConfiguration(config); - String HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = HOME + testname + ".dml"; programArgs = new String[]{ "-stats","-args", output("Scalar") }; @@ -114,13 +98,9 @@ public class RewritePushdownSumBinaryMult extends AutomatedTestBase HashMap<CellIndex, Double> dmlfile = readDMLScalarFromHDFS("Scalar"); HashMap<CellIndex, Double> rfile = readRScalarFromFS("Scalar"); TestUtils.compareScalars(dmlfile.toString(), rfile.toString()); - System.out.println("Test case passed"); - } - finally - { + finally { OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldFlag; - } - + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9451a0fd/src/test/scripts/functions/frame/FrameMatrixReblock.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/frame/FrameMatrixReblock.dml b/src/test/scripts/functions/frame/FrameMatrixReblock.dml new file mode 100644 index 0000000..67eee82 --- /dev/null +++ b/src/test/scripts/functions/frame/FrameMatrixReblock.dml @@ -0,0 +1,25 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = read($1, rows=$2, cols=$3, data_type="frame", schema="double", format=$5); + +B = as.matrix(A); +write(B, $4, format=$5);
