Repository: incubator-systemml
Updated Branches:
  refs/heads/master 0d615e51c -> 479497017


[SYSTEMML-567] Full buffer pool integration frames (rdd read/write)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/47949701
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/47949701
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/47949701

Branch: refs/heads/master
Commit: 479497017f234135ef1de6fb712e5d70ee5346b5
Parents: 0d615e5
Author: Matthias Boehm <[email protected]>
Authored: Wed Jun 1 20:46:31 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Jun 1 23:49:09 2016 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/FrameObject.java     | 46 ++++++++++-
 .../context/SparkExecutionContext.java          | 83 ++++++++++++++++++++
 .../sysml/runtime/matrix/data/OutputInfo.java   |  8 +-
 .../functions/frame/FrameMatrixCastingTest.java |  3 +-
 4 files changed, 131 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index 2df32c6..84a74dd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -30,6 +30,7 @@ import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
@@ -41,6 +42,7 @@ import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.MetaData;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 
 public class FrameObject extends CacheableData<FrameBlock>
@@ -166,8 +168,38 @@ public class FrameObject extends CacheableData<FrameBlock>
        protected FrameBlock readBlobFromRDD(RDDObject rdd, MutableBoolean 
status)
                        throws IOException 
        {
-               //TODO support for distributed frame representations
-               throw new IOException("Not implemented yet.");
+               //note: the read of a matrix block from an RDD might trigger
+               //lazy evaluation of pending transformations.
+               RDDObject lrdd = rdd;
+
+               //prepare return status (by default only collect)
+               status.setValue(false);
+               
+               MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
+               MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
+               FrameBlock fb = null;
+               
+               try  {
+                       //prevent unnecessary collect through rdd checkpoint
+                       if( rdd.allowsShortCircuitCollect() ) {
+                               lrdd = (RDDObject)rdd.getLineageChilds().get(0);
+                       }
+                       
+                       //collect frame block from binary block RDD
+                       int rlen = (int)mc.getRows();
+                       int clen = (int)mc.getCols();
+                       fb = SparkExecutionContext.toFrameBlock(lrdd, _schema, 
rlen, clen);     
+               }
+               catch(DMLRuntimeException ex) {
+                       throw new IOException(ex);
+               }
+               
+               //sanity check correct output
+               if( fb == null ) {
+                       throw new IOException("Unable to load matrix from rdd: 
"+lrdd.getVarName());
+               }
+               
+               return fb;
        }
 
        @Override
@@ -183,7 +215,13 @@ public class FrameObject extends CacheableData<FrameBlock>
        protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, 
String ofmt) 
                throws IOException, DMLRuntimeException 
        {
-               //TODO support for distributed frame representations
-               throw new IOException("Not implemented yet.");
+               //prepare output info
+        MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
+           OutputInfo oinfo = (ofmt != null ? 
OutputInfo.stringToOutputInfo(ofmt) 
+                : InputInfo.getMatchingOutputInfo (iimd.getInputInfo()));
+           
+               //note: the write of an RDD to HDFS might trigger
+               //lazy evaluation of pending transformations.                   
        
+               SparkExecutionContext.writeFrameRDDtoHDFS(rdd, fname, oinfo);   
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 956ea4e..020c49f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -44,6 +44,7 @@ import org.apache.sysml.api.MLContextProxy;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.Checkpoint;
+import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.Program;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
@@ -63,6 +64,7 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFuncti
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -855,6 +857,63 @@ public class SparkExecutionContext extends ExecutionContext
                                
                return out;
        }
+
+       /**
+        * 
+        * @param rdd
+        * @param schema
+        * @param rlen
+        * @param clen
+        * @return
+        * @throws DMLRuntimeException 
+        */
+       @SuppressWarnings("unchecked")
+       public static FrameBlock toFrameBlock(RDDObject rdd, List<ValueType> 
schema, int rlen, int clen) 
+               throws DMLRuntimeException 
+       {
+               JavaPairRDD<Long,FrameBlock> lrdd = 
(JavaPairRDD<Long,FrameBlock>) rdd.getRDD();
+               return toFrameBlock(lrdd, schema, rlen, clen);
+       }
+       
+       /**
+        * 
+        * @param rdd
+        * @param schema
+        * @param rlen
+        * @param clen
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, 
List<ValueType> schema, int rlen, int clen) 
+               throws DMLRuntimeException
+       {
+               long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+
+               //create output frame block (w/ lazy allocation)
+               FrameBlock out = new FrameBlock(schema);
+               out.ensureAllocatedColumns(rlen);
+               
+               List<Tuple2<Long,FrameBlock>> list = rdd.collect();
+               
+               //copy blocks one-at-a-time into output matrix block
+               for( Tuple2<Long,FrameBlock> keyval : list )
+               {
+                       //unpack index-block pair
+                       int ix = (int)(keyval._1() - 1);
+                       FrameBlock block = keyval._2();
+               
+                       //copy into output frame
+                       out.copy( ix, ix+block.getNumRows(), 
+                                         0, block.getNumColumns()-1, block );  
+               }
+               
+               if (DMLScript.STATISTICS) {
+                       Statistics.accSparkCollectTime(System.nanoTime() - t0);
+                       Statistics.incSparkCollectCount(1);
+               }
+               
+               return out;
+       }
        
        /**
         * 
@@ -879,6 +938,30 @@ public class SparkExecutionContext extends ExecutionContext
                return nnz;
        }
        
+       /**
+        * 
+        * @param rdd
+        * @param oinfo
+        */
+       @SuppressWarnings("unchecked")
+       public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, 
OutputInfo oinfo )
+       {
+               JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, 
FrameBlock>) rdd.getRDD();
+               
+               //convert keys to writables if necessary
+               if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
+                       lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair(
+                                       new 
LongFrameToLongWritableFrameFunction());
+                       oinfo = OutputInfo.BinaryBlockFrameOutputInfo;
+               }
+       
+               //save file is an action which also triggers nnz maintenance
+               lrdd.saveAsHadoopFile(path, 
+                               oinfo.outputKeyClass, 
+                               oinfo.outputValueClass, 
+                               oinfo.outputFormatClass);
+       }
+       
        ///////////////////////////////////////////
        // Cleanup of RDDs and Broadcast variables
        ///////

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
index e0c7bcb..423607c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OutputInfo.java
@@ -24,13 +24,13 @@ import java.io.Serializable;
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.matrix.mapred.CSVWriteReducer.RowBlockForTextOutput;
 import org.apache.sysml.runtime.matrix.sort.CompactOutputFormat;
@@ -59,8 +59,10 @@ public class OutputInfo implements Serializable
                        NullWritable.class, Text.class);
        public static final OutputInfo BinaryCellOutputInfo=new 
OutputInfo(SequenceFileOutputFormat.class, 
                        MatrixIndexes.class, MatrixCell.class);
-       public static final OutputInfo BinaryBlockOutputInfo=new 
OutputInfo(SequenceFileOutputFormat.class, 
-                       MatrixIndexes.class, MatrixBlock.class);
+       public static final OutputInfo BinaryBlockOutputInfo=new OutputInfo(
+                       SequenceFileOutputFormat.class, MatrixIndexes.class, 
MatrixBlock.class);
+       public static final OutputInfo BinaryBlockFrameOutputInfo=new 
OutputInfo(
+                       SequenceFileOutputFormat.class, LongWritable.class, 
FrameBlock.class);
        public static final OutputInfo OutputInfoForSortInput=new 
OutputInfo(SequenceFileOutputFormat.class, 
                        DoubleWritable.class, IntWritable.class);
        public static final OutputInfo OutputInfoForSortOutput = new 
OutputInfo(CompactOutputFormat.class,

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/47949701/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
index 0baf9c2..5edada9 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixCastingTest.java
@@ -119,7 +119,6 @@ public class FrameMatrixCastingTest extends 
AutomatedTestBase
                runFrameCastingTest(TEST_NAME1, true, ValueType.DOUBLE, 
ExecType.SPARK);
        }
 
-       /*TODO write distributed frame missing
        @Test
        public void testMatrix2FrameCastSingleSpark() {
                runFrameCastingTest(TEST_NAME2, false, null, ExecType.SPARK);
@@ -129,7 +128,7 @@ public class FrameMatrixCastingTest extends 
AutomatedTestBase
        public void testMatrix2FrameCastMultiSpark() {
                runFrameCastingTest(TEST_NAME2, true, null, ExecType.SPARK);
        }
-       */
+       
        
        /**
         * 

Reply via email to