Repository: incubator-systemml Updated Branches: refs/heads/master 0d615e51c -> 479497017
[SYSTEMML-567] Full buffer pool integration frames (rdd read/write) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/47949701 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/47949701 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/47949701 Branch: refs/heads/master Commit: 479497017f234135ef1de6fb712e5d70ee5346b5 Parents: 0d615e5 Author: Matthias Boehm <[email protected]> Authored: Wed Jun 1 20:46:31 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Jun 1 23:49:09 2016 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/FrameObject.java | 46 ++++++++++- .../context/SparkExecutionContext.java | 83 ++++++++++++++++++++ .../sysml/runtime/matrix/data/OutputInfo.java | 8 +- .../functions/frame/FrameMatrixCastingTest.java | 3 +- 4 files changed, 131 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index 2df32c6..84a74dd 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -30,6 +30,7 @@ import org.apache.sysml.parser.DataExpression; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; @@ -41,6 +42,7 @@ import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.MetaData; import org.apache.sysml.runtime.matrix.data.FileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; public class FrameObject extends CacheableData<FrameBlock> @@ -166,8 +168,38 @@ public class FrameObject extends CacheableData<FrameBlock> protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean status) throws IOException { - //TODO support for distributed frame representations - throw new IOException("Not implemented yet."); + //note: the read of a matrix block from an RDD might trigger + //lazy evaluation of pending transformations. + RDDObject lrdd = rdd; + + //prepare return status (by default only collect) + status.setValue(false); + + MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; + MatrixCharacteristics mc = iimd.getMatrixCharacteristics(); + FrameBlock fb = null; + + try { + //prevent unnecessary collect through rdd checkpoint + if( rdd.allowsShortCircuitCollect() ) { + lrdd = (RDDObject)rdd.getLineageChilds().get(0); + } + + //collect frame block from binary block RDD + int rlen = (int)mc.getRows(); + int clen = (int)mc.getCols(); + fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, rlen, clen); + } + catch(DMLRuntimeException ex) { + throw new IOException(ex); + } + + //sanity check correct output + if( fb == null ) { + throw new IOException("Unable to load matrix from rdd: "+lrdd.getVarName()); + } + + return fb; } @Override @@ -183,7 +215,13 @@ public class FrameObject extends CacheableData<FrameBlock> protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt) throws IOException, DMLRuntimeException { - //TODO support for distributed frame representations - throw new IOException("Not implemented yet."); + //prepare output info + MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; + OutputInfo oinfo = (ofmt != null ? OutputInfo.stringToOutputInfo(ofmt) + : InputInfo.getMatchingOutputInfo (iimd.getInputInfo())); + + //note: the write of an RDD to HDFS might trigger + //lazy evaluation of pending transformations. + SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, oinfo); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 956ea4e..020c49f 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -44,6 +44,7 @@ import org.apache.sysml.api.MLContextProxy; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.Checkpoint; +import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.Program; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; @@ -63,6 +64,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFuncti import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction; import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -855,6 +857,63 @@ public class SparkExecutionContext extends ExecutionContext return out; } + + /** + * + * @param rdd + * @param schema + * @param rlen + * @param clen + * @return + * @throws DMLRuntimeException + */ + @SuppressWarnings("unchecked") + public static FrameBlock toFrameBlock(RDDObject rdd, List<ValueType> schema, int rlen, int clen) + throws DMLRuntimeException + { + JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD(); + return toFrameBlock(lrdd, schema, rlen, clen); + } + + /** + * + * @param rdd + * @param schema + * @param rlen + * @param clen + * @return + * @throws DMLRuntimeException + */ + public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, List<ValueType> schema, int rlen, int clen) + throws DMLRuntimeException + { + long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; + + //create output frame block (w/ lazy allocation) + FrameBlock out = new FrameBlock(schema); + out.ensureAllocatedColumns(rlen); + + List<Tuple2<Long,FrameBlock>> list = rdd.collect(); + + //copy blocks one-at-a-time into output matrix block + for( Tuple2<Long,FrameBlock> keyval : list ) + { + //unpack index-block pair + int ix = (int)(keyval._1() - 1); + FrameBlock block = keyval._2(); + + //copy into output frame + out.copy( ix, ix+block.getNumRows(), + 0, block.getNumColumns()-1, block ); + } + + if (DMLScript.STATISTICS) { + Statistics.accSparkCollectTime(System.nanoTime() - t0); + Statistics.incSparkCollectCount(1); + } + + return out; + } /** * @@ -879,6 +938,30 @@ public class SparkExecutionContext extends ExecutionContext return nnz; } + /** + * + * @param rdd + * @param oinfo + */ + @SuppressWarnings("unchecked") + public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo ) + { + JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD(); + + //convert keys to writables if necessary + if( oinfo == OutputInfo.BinaryBlockOutputInfo ) { + lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair( + new LongFrameToLongWritableFrameFunction()); + oinfo = OutputInfo.BinaryBlockFrameOutputInfo; + } + + //save file is an action which also triggers nnz maintenance + lrdd.saveAsHadoopFile(path, + oinfo.outputKeyClass, + oinfo.outputValueClass, + oinfo.outputFormatClass); + } + /////////////////////////////////////////// // Cleanup of RDDs and Broadcast variables /////// http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java index e0c7bcb..423607c 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java @@ -24,13 +24,13 @@ import java.io.Serializable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextOutputFormat; - import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.mapred.CSVWriteReducer.RowBlockForTextOutput; import org.apache.sysml.runtime.matrix.sort.CompactOutputFormat; @@ -59,8 +59,10 @@ public class OutputInfo implements Serializable NullWritable.class, Text.class); public static final OutputInfo BinaryCellOutputInfo=new OutputInfo(SequenceFileOutputFormat.class, MatrixIndexes.class, MatrixCell.class); - public static final OutputInfo BinaryBlockOutputInfo=new OutputInfo(SequenceFileOutputFormat.class, - MatrixIndexes.class, MatrixBlock.class); + public static final OutputInfo BinaryBlockOutputInfo=new OutputInfo( + SequenceFileOutputFormat.class, MatrixIndexes.class, MatrixBlock.class); + public static final OutputInfo BinaryBlockFrameOutputInfo=new OutputInfo( + SequenceFileOutputFormat.class, LongWritable.class, FrameBlock.class); public static final OutputInfo OutputInfoForSortInput=new OutputInfo(SequenceFileOutputFormat.class, DoubleWritable.class, IntWritable.class); public static final OutputInfo OutputInfoForSortOutput = new OutputInfo(CompactOutputFormat.class, http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java index 0baf9c2..5edada9 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java @@ -119,7 +119,6 @@ public class FrameMatrixCastingTest extends AutomatedTestBase runFrameCastingTest(TEST_NAME1, true, ValueType.DOUBLE, ExecType.SPARK); } - /*TODO write distributed frame missing @Test public void testMatrix2FrameCastSingleSpark() { runFrameCastingTest(TEST_NAME2, false, null, ExecType.SPARK); @@ -129,7 +128,7 @@ public class FrameMatrixCastingTest extends AutomatedTestBase public void testMatrix2FrameCastMultiSpark() { runFrameCastingTest(TEST_NAME2, true, null, ExecType.SPARK); } - */ + /** *
