Repository: incubator-systemml Updated Branches: refs/heads/master 8a0df5b85 -> eebb9665a
[SYSTEMML-562] Spark frame csv reblock instruction, tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eebb9665 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eebb9665 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eebb9665 Branch: refs/heads/master Commit: eebb9665a8b442ba0f6a946ffd3396ca5905c893 Parents: 8a0df5b Author: Matthias Boehm <[email protected]> Authored: Sat Jun 11 20:40:04 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Jun 11 20:40:04 2016 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/OptimizerUtils.java | 8 +++ .../rewrite/RewriteBlockSizeAndReblock.java | 5 +- .../spark/CSVReblockSPInstruction.java | 68 ++++++++++++++++---- .../instructions/spark/WriteSPInstruction.java | 4 +- .../spark/utils/FrameRDDConverterUtils.java | 32 ++++----- .../functions/frame/FrameConverterTest.java | 7 +- .../functions/frame/FrameMatrixReblockTest.java | 39 ++++++----- 7 files changed, 109 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 833f5ea..aa9f7d0 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -195,6 +195,14 @@ public class OptimizerUtils */ public static final boolean ALLOW_COMBINE_FILE_INPUT_FORMAT = true; + /** + * Enables automatic csv-binary block reblock. + * + * TODO enable by default and remove once file-based transform completely + * removed via frame-based transform/transformapply + */ + public static boolean ALLOW_FRAME_CSV_REBLOCK = false; + ////////////////////// // Optimizer levels // http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java index 92dcd69..2b875a1 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteBlockSizeAndReblock.java @@ -93,9 +93,10 @@ public class RewriteBlockSizeAndReblock extends HopRewriteRule if (hop instanceof DataOp) { // if block size does not match - if( canReblock //TODO change frame condition to != BINARY once transform over frames supported + if( canReblock && ((hop.getDataType() == DataType.MATRIX && (hop.getRowsInBlock() != GLOBAL_BLOCKSIZE || hop.getColsInBlock() != GLOBAL_BLOCKSIZE) - ||(hop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode() && ((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT)))) + ||(hop.getDataType() == DataType.FRAME && OptimizerUtils.isSparkExecutionMode() && (((DataOp)hop).getInputFormatType()==FileFormatTypes.TEXT + || ((DataOp)hop).getInputFormatType()==FileFormatTypes.CSV && OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK))))) { if (((DataOp) hop).getDataOpType() == DataOp.DataOpTypes.PERSISTENTREAD) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java index 98cc5a0..44a076d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CSVReblockSPInstruction.java @@ -22,17 +22,19 @@ package org.apache.sysml.runtime.instructions.spark; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; - import org.apache.sysml.hops.recompile.Recompiler; +import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; +import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -86,15 +88,14 @@ public class CSVReblockSPInstruction extends UnarySPInstruction } @Override - @SuppressWarnings("unchecked") public void processInstruction(ExecutionContext ec) throws DMLRuntimeException { SparkExecutionContext sec = (SparkExecutionContext) ec; //sanity check input info - MatrixObject mo = sec.getMatrixObject(input1.getName()); - MatrixFormatMetaData iimd = (MatrixFormatMetaData) mo.getMetaData(); + CacheableData<?> obj = sec.getCacheableData(input1.getName()); + MatrixFormatMetaData iimd = (MatrixFormatMetaData) obj.getMetaData(); if (iimd.getInputInfo() != InputInfo.CSVInputInfo) { throw new DMLRuntimeException("The given InputInfo is not implemented for " + "CSVReblockSPInstruction:" + iimd.getInputInfo()); @@ -107,24 +108,67 @@ public class CSVReblockSPInstruction extends UnarySPInstruction //check for in-memory reblock (w/ lazy spark context, potential for latency reduction) if( Recompiler.checkCPReblock(sec, input1.getName()) ) { - Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); + if( input1.getDataType() == DataType.MATRIX ) + Recompiler.executeInMemoryMatrixReblock(sec, input1.getName(), output.getName()); + else if( input1.getDataType() == DataType.FRAME ) + Recompiler.executeInMemoryFrameReblock(sec, input1.getName(), output.getName()); return; } //check jdk version (prevent double.parseDouble contention on <jdk8) sec.checkAndRaiseValidationWarningJDKVersion(); + //execute matrix/frame csvreblock + JavaPairRDD<?,?> out = null; + if( input1.getDataType() == DataType.MATRIX ) + out = processMatrixCSVReblockInstruction(sec, mcOut); + else if( input1.getDataType() == DataType.FRAME ) + out = processFrameCSVReblockInstruction(sec, mcOut); + + // put output RDD handle into symbol table + sec.setRDDHandleForVariable(output.getName(), out); + sec.addLineageRDD(output.getName(), input1.getName()); + } + + /** + * + * @param sec + * @param mcOut + * @return + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + protected JavaPairRDD<MatrixIndexes,MatrixBlock> processMatrixCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut) + throws DMLRuntimeException + { //get input rdd (needs to be longwritable/text for consistency with meta data, in case of //serialization issues create longwritableser/textser as serializable wrappers JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>) - sec.getRDDHandleForVariable(input1.getName(), iimd.getInputInfo()); + sec.getRDDHandleForVariable(input1.getName(), InputInfo.CSVInputInfo); //reblock csv to binary block - JavaPairRDD<MatrixIndexes, MatrixBlock> out = RDDConverterUtils.csvToBinaryBlock( - sec.getSparkContext(), in, mcOut, _hasHeader, _delim, _fill, _fillValue); + return RDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), + in, mcOut, _hasHeader, _delim, _fill, _fillValue); + } + + /** + * + * @param sec + * @param mcOut + * @return + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + protected JavaPairRDD<Long,FrameBlock> processFrameCSVReblockInstruction(SparkExecutionContext sec, MatrixCharacteristics mcOut) + throws DMLRuntimeException + { + //get input rdd (needs to be longwritable/text for consistency with meta data, in case of + //serialization issues create longwritableser/textser as serializable wrappers + JavaPairRDD<LongWritable, Text> in = (JavaPairRDD<LongWritable, Text>) + sec.getRDDHandleForVariable(input1.getName(), InputInfo.CSVInputInfo); - // put output RDD handle into symbol table - sec.setRDDHandleForVariable(output.getName(), out); - sec.addLineageRDD(output.getName(), input1.getName()); + //reblock csv to binary block + return FrameRDDConverterUtils.csvToBinaryBlock(sec.getSparkContext(), + in, mcOut, _hasHeader, _delim, _fill, _fillValue); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 682e0b7..f42c29e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -38,7 +38,7 @@ import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction; import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; -import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -302,7 +302,7 @@ public class WriteSPInstruction extends SPInstruction } else if( oi == OutputInfo.BinaryBlockOutputInfo ) { - JavaPairRDD<LongWritable,FrameBlock> out = in1.mapToPair(new LongFrameBlockToLongWritableFrameBlock()); + JavaPairRDD<LongWritable,FrameBlock> out = in1.mapToPair(new LongFrameToLongWritableFrameFunction()); out.saveAsHadoopFile(fname, LongWritable.class, FrameBlock.class, SequenceFileOutputFormat.class); } else { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 7c8c08b..b2b1a97 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -75,7 +75,7 @@ public class FrameRDDConverterUtils * @return * @throws DMLRuntimeException */ - public static JavaPairRDD<LongWritable, FrameBlock> csvToBinaryBlock(JavaSparkContext sc, + public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc, JavaPairRDD<LongWritable, Text> input, MatrixCharacteristics mcOut, boolean hasHeader, String delim, boolean fill, double fillValue) throws DMLRuntimeException @@ -94,9 +94,9 @@ public class FrameRDDConverterUtils .zipWithIndex(); //zip row index //convert csv rdd to binary block rdd (w/ partial blocks) - JavaPairRDD<LongWritable, FrameBlock> out = - prepinput.mapPartitionsToPair( - new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill)); + JavaPairRDD<Long, FrameBlock> out = prepinput + .mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill)) + .mapToPair(new LongWritableFrameToLongFrameFunction()); return out; } @@ -112,7 +112,7 @@ public class FrameRDDConverterUtils * @return * @throws DMLRuntimeException */ - public static JavaPairRDD<LongWritable, FrameBlock> csvToBinaryBlock(JavaSparkContext sc, + public static JavaPairRDD<Long, FrameBlock> csvToBinaryBlock(JavaSparkContext sc, JavaRDD<String> input, MatrixCharacteristics mcOut, boolean hasHeader, String delim, boolean fill, double fillValue) throws DMLRuntimeException @@ -340,34 +340,30 @@ public class FrameRDDConverterUtils } /** - * + * */ - public static class LongFrameBlockToLongWritableFrameBlock implements PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> + public static class LongFrameToLongWritableFrameFunction implements PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> { - private static final long serialVersionUID = 3201887196237766424L; + private static final long serialVersionUID = -1467314923206783333L; @Override public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, FrameBlock> arg0) throws Exception { - return new Tuple2<LongWritable,FrameBlock>(new LongWritable(arg0._1), arg0._2); + return new Tuple2<LongWritable, FrameBlock>(new LongWritable(arg0._1), arg0._2); } } - - - + /** * */ - public static class LongFrameToLongWritableFrameFunction implements PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> + public static class LongWritableFrameToLongFrameFunction implements PairFunction<Tuple2<LongWritable,FrameBlock>,Long,FrameBlock> { - - private static final long serialVersionUID = -1467314923206783333L; + private static final long serialVersionUID = -1232439643533739078L; @Override - public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, FrameBlock> arg0) throws Exception { - return new Tuple2<LongWritable, FrameBlock>(new LongWritable(arg0._1), arg0._2); + public Tuple2<Long, FrameBlock> call(Tuple2<LongWritable, FrameBlock> arg0) throws Exception { + return new Tuple2<Long, FrameBlock>(arg0._1.get(), arg0._2); } } - /** * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index d0a7f88..e271824 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -38,7 +38,7 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; -import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.io.FrameWriter; @@ -440,7 +440,8 @@ public class FrameConverterTest extends AutomatedTestBase OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .csvToBinaryBlock(sc, rddIn, mc, false, ",", false, 0); + .csvToBinaryBlock(sc, rddIn, mc, false, ",", false, 0) + .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); break; } @@ -459,7 +460,7 @@ public class FrameConverterTest extends AutomatedTestBase JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils .textCellToBinaryBlock(sc, rddIn, mc, schema) - .mapToPair(new LongFrameBlockToLongWritableFrameBlock()); + .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); break; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eebb9665/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java index 7da887e..ecc958b 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixReblockTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.LopProperties.ExecType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.io.FrameWriter; @@ -99,11 +100,10 @@ public class FrameMatrixReblockTest extends AutomatedTestBase runFrameReblockTest(TEST_NAME1, false, false, "text", ExecType.SPARK); } -//TODO enable csv spark tests once transform over frame supported -// @Test -// public void testFrameWriteSingleDenseCsvSpark() { -// runFrameReblockTest(TEST_NAME1, false, false, "csv", ExecType.SPARK); -// } + @Test + public void testFrameWriteSingleDenseCsvSpark() { + runFrameReblockTest(TEST_NAME1, false, false, "csv", ExecType.SPARK); + } @Test public void testFrameWriteMultipleDenseBinarySpark() { @@ -115,10 +115,10 @@ public class FrameMatrixReblockTest extends AutomatedTestBase runFrameReblockTest(TEST_NAME1, true, false, "text", ExecType.SPARK); } -// @Test -// public void testFrameWriteMultipleDenseCsvSpark() { -// runFrameReblockTest(TEST_NAME1, true, false, "csv", ExecType.SPARK); -// } + @Test + public void testFrameWriteMultipleDenseCsvSpark() { + runFrameReblockTest(TEST_NAME1, true, false, "csv", ExecType.SPARK); + } @Test public void testFrameWriteSingleSparseBinaryCP() { @@ -160,10 +160,10 @@ public class FrameMatrixReblockTest extends AutomatedTestBase runFrameReblockTest(TEST_NAME1, false, true, "text", ExecType.SPARK); } -// @Test -// public void testFrameWriteSingleSparseCsvSpark() { -// runFrameReblockTest(TEST_NAME1, false, true, "csv", ExecType.SPARK); -// } + @Test + public void testFrameWriteSingleSparseCsvSpark() { + runFrameReblockTest(TEST_NAME1, false, true, "csv", ExecType.SPARK); + } @Test public void testFrameWriteMultipleSparseBinarySpark() { @@ -175,10 +175,10 @@ public class FrameMatrixReblockTest extends AutomatedTestBase runFrameReblockTest(TEST_NAME1, true, true, "text", ExecType.SPARK); } -// @Test -// public void testFrameWriteMultipleSparseCsvSpark() { -// runFrameReblockTest(TEST_NAME1, true, true, "csv", ExecType.SPARK); -// } + @Test + public void testFrameWriteMultipleSparseCsvSpark() { + runFrameReblockTest(TEST_NAME1, true, true, "csv", ExecType.SPARK); + } /** * @@ -201,6 +201,10 @@ public class FrameMatrixReblockTest extends AutomatedTestBase if( rtplatform == RUNTIME_PLATFORM.SPARK ) DMLScript.USE_LOCAL_SPARK_CONFIG = true; + boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK; + if( ofmt.equals("csv") ) + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; + try { int cols = multColBlks ? cols2 : cols1; @@ -231,6 +235,7 @@ public class FrameMatrixReblockTest extends AutomatedTestBase finally { rtplatform = platformOld; DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld; } }
