Repository: incubator-systemml
Updated Branches:
  refs/heads/master cf4e5ab6e -> c76b01a75


[SYSTEMML-562] Spark frame write instruction (binary/text/csv), tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c76b01a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c76b01a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c76b01a7

Branch: refs/heads/master
Commit: c76b01a753837150c590c79557acdccb9d756a7e
Parents: cf4e5ab
Author: Matthias Boehm <[email protected]>
Authored: Wed Jun 8 23:24:18 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jun 9 18:00:50 2016 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/DataOp.java |   5 +-
 .../instructions/spark/WriteSPInstruction.java  | 257 ++++++++++++-------
 .../functions/ConvertFrameBlockToIJVLines.java  |  43 ++--
 .../spark/utils/FrameRDDConverterUtils.java     |  65 +++--
 .../functions/frame/FrameConverterTest.java     |   7 +-
 .../functions/frame/FrameMatrixWriteTest.java   | 193 ++++++++++++++
 .../functions/frame/FrameMatrixWrite.dml        |  26 ++
 7 files changed, 454 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/hops/DataOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/DataOp.java 
b/src/main/java/org/apache/sysml/hops/DataOp.java
index d8e5519..b4dbae4 100644
--- a/src/main/java/org/apache/sysml/hops/DataOp.java
+++ b/src/main/java/org/apache/sysml/hops/DataOp.java
@@ -37,7 +37,6 @@ import org.apache.sysml.runtime.util.LocalFileUtils;
 
 public class DataOp extends Hop 
 {
-
        private DataOpTypes _dataop;
        private String _fileName = null;
        
@@ -436,8 +435,8 @@ public class DataOp extends Hop
                {
                        checkAndSetForcedPlatform();
 
-                       //additional check for write only (TODO: remove frame 
here once support for distributed)
-                       if( getDataType()==DataType.SCALAR || 
getDataType()==DataType.FRAME )
+                       //additional check for write only
+                       if( getDataType()==DataType.SCALAR || 
(getDataType()==DataType.FRAME && REMOTE==ExecType.MR) )
                                _etypeForced = ExecType.CP;
                        
                        if( _etypeForced != null )                      

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 3ea03ef..682e0b7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Random;
 
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
-
+import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -36,11 +37,15 @@ import 
org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -54,6 +59,7 @@ public class WriteSPInstruction extends SPInstruction
        private FileFormatProperties formatProperties;
        
        //scalars might occur for transform
+       // TODO remove once transform over frames supported
        private boolean isInputMatrixBlock = true; 
        
        public WriteSPInstruction(String opcode, String istr) {
@@ -87,10 +93,9 @@ public class WriteSPInstruction extends SPInstruction
                
                
//SPARK°write°_mVar2·MATRIX·DOUBLE°./src/test/scripts/functions/data/out/B·SCALAR·STRING·true°matrixmarket·SCALAR·STRING·true
                // _mVar2·MATRIX·DOUBLE
-               CPOperand in1=null, in2=null, in3=null;
-               in1 = new CPOperand(parts[1]);
-               in2 = new CPOperand(parts[2]);
-               in3 = new CPOperand(parts[3]);
+               CPOperand in1 = new CPOperand(parts[1]);
+               CPOperand in2 = new CPOperand(parts[2]);
+               CPOperand in3 = new CPOperand(parts[3]);
                
                WriteSPInstruction inst = new WriteSPInstruction(in1, in2, in3, 
opcode, str); 
                
@@ -141,110 +146,172 @@ public class WriteSPInstruction extends SPInstruction
                        //prepare output info according to meta data
                        String outFmt = input3.getName();
                        OutputInfo oi = OutputInfo.stringToOutputInfo(outFmt);
-                               
-                       //get input rdd
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-                       MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
                        
-                       if(    oi == OutputInfo.MatrixMarketOutputInfo
-                               || oi == OutputInfo.TextCellOutputInfo     ) 
-                       {
-                               //recompute nnz if necessary (required for 
header if matrix market)
-                               if ( isInputMatrixBlock && !mc.nnzKnown() )
-                                       mc.setNonZeros( 
SparkUtils.computeNNZFromBlocks(in1) );
-                               
-                               JavaRDD<String> header = null;                  
        
-                               if(outFmt.equalsIgnoreCase("matrixmarket")) {
-                                       ArrayList<String> headerContainer = new 
ArrayList<String>(1);
-                                       // First output MM header
-                                       String headerStr = "%%MatrixMarket 
matrix coordinate real general\n" +
-                                                       // output number of 
rows, number of columns and number of nnz
-                                                       mc.getRows() + " " + 
mc.getCols() + " " + mc.getNonZeros();
-                                       headerContainer.add(headerStr);
-                                       header = 
sec.getSparkContext().parallelize(headerContainer);
-                               }
-                               
-                               JavaRDD<String> ijv = in1.flatMap(new 
ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock()));
-                               if(header != null)
-                                       customSaveTextFile(header.union(ijv), 
fname, true);
-                               else
-                                       customSaveTextFile(ijv, fname, false);
-                       }
-                       else if( oi == OutputInfo.CSVOutputInfo ) 
-                       {
-                               JavaRDD<String> out = null;
-                               Accumulator<Double> aNnz = null;
-                               
-                               if ( isInputMatrixBlock ) {
-                                       //piggyback nnz computation on actual 
write
-                                       if( !mc.nnzKnown() ) {
-                                               aNnz = 
sec.getSparkContext().accumulator(0L);
-                                               in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
-                                       }       
-                                       
-                                       out = 
RDDConverterUtils.binaryBlockToCsv(in1, mc, 
-                                                       
(CSVFileFormatProperties) formatProperties, true);
-                               }
-                               else 
-                               {
-                                       // This case is applicable when the CSV 
output from transform() is written out
-                                       @SuppressWarnings("unchecked")
-                                       JavaPairRDD<Long,String> rdd = 
(JavaPairRDD<Long, String>) 
(sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD();
-                                       out = rdd.values(); 
-
-                                       String sep = ",";
-                                       boolean hasHeader = false;
-                                       if(formatProperties != null) {
-                                               sep = 
((CSVFileFormatProperties) formatProperties).getDelim();
-                                               hasHeader = 
((CSVFileFormatProperties) formatProperties).hasHeader();
-                                       }
-                                       
-                                       if(hasHeader) {
-                                               StringBuffer buf = new 
StringBuffer();
-                                       for(int j = 1; j < mc.getCols(); j++) {
-                                               if(j != 1) {
-                                                       buf.append(sep);
-                                               }
-                                               buf.append("C" + j);
-                                       }
-                                       ArrayList<String> headerContainer = new 
ArrayList<String>(1);
-                                       headerContainer.add(0, buf.toString());
-                                       JavaRDD<String> header = 
sec.getSparkContext().parallelize(headerContainer);
-                                       out = header.union(out);
-                                       }
-                               }
-                               
-                               customSaveTextFile(out, fname, false);
-                               
-                               if( isInputMatrixBlock && !mc.nnzKnown() )
-                                       
mc.setNonZeros((long)aNnz.value().longValue());
+                       //core matrix/frame write
+                       if( input1.getDataType()==DataType.MATRIX )
+                               processMatrixWriteInstruction(sec, fname, oi);
+                       else
+                               processFrameWriteInstruction(sec, fname, oi);
+               }
+               catch(IOException ex)
+               {
+                       throw new DMLRuntimeException("Failed to process write 
instruction", ex);
+               }
+       }
+       
+       /**
+        * 
+        * @param sec
+        * @param fname
+        * @param oi
+        * @throws DMLRuntimeException
+        * @throws IOException 
+        */
+       protected void processMatrixWriteInstruction(SparkExecutionContext sec, 
String fname, OutputInfo oi) 
+               throws DMLRuntimeException, IOException
+       {
+               //get input rdd
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+               MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
+               
+               if(    oi == OutputInfo.MatrixMarketOutputInfo
+                       || oi == OutputInfo.TextCellOutputInfo     ) 
+               {
+                       //recompute nnz if necessary (required for header if 
matrix market)
+                       if ( isInputMatrixBlock && !mc.nnzKnown() )
+                               mc.setNonZeros( 
SparkUtils.computeNNZFromBlocks(in1) );
+                       
+                       JavaRDD<String> header = null;                          
+                       if( oi == OutputInfo.MatrixMarketOutputInfo  ) {
+                               ArrayList<String> headerContainer = new 
ArrayList<String>(1);
+                               // First output MM header
+                               String headerStr = "%%MatrixMarket matrix 
coordinate real general\n" +
+                                               // output number of rows, 
number of columns and number of nnz
+                                               mc.getRows() + " " + 
mc.getCols() + " " + mc.getNonZeros();
+                               headerContainer.add(headerStr);
+                               header = 
sec.getSparkContext().parallelize(headerContainer);
                        }
-                       else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
+                       
+                       JavaRDD<String> ijv = in1.flatMap(new 
ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock()));
+                       if(header != null)
+                               customSaveTextFile(header.union(ijv), fname, 
true);
+                       else
+                               customSaveTextFile(ijv, fname, false);
+               }
+               else if( oi == OutputInfo.CSVOutputInfo ) 
+               {
+                       JavaRDD<String> out = null;
+                       Accumulator<Double> aNnz = null;
+                       
+                       if ( isInputMatrixBlock ) {
                                //piggyback nnz computation on actual write
-                               Accumulator<Double> aNnz = null;
                                if( !mc.nnzKnown() ) {
                                        aNnz = 
sec.getSparkContext().accumulator(0L);
                                        in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
-                               }
+                               }       
                                
-                               //save binary block rdd on hdfs
-                               in1.saveAsHadoopFile(fname, 
MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class);
+                               out = RDDConverterUtils.binaryBlockToCsv(in1, 
mc, 
+                                               (CSVFileFormatProperties) 
formatProperties, true);
+                       }
+                       else 
+                       {
+                               // This case is applicable when the CSV output 
from transform() is written out
+                               // TODO remove once transform over frames 
supported
+                               @SuppressWarnings("unchecked")
+                               JavaPairRDD<Long,String> rdd = 
(JavaPairRDD<Long, String>) 
(sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD();
+                               out = rdd.values(); 
+
+                               String sep = ",";
+                               boolean hasHeader = false;
+                               if(formatProperties != null) {
+                                       sep = ((CSVFileFormatProperties) 
formatProperties).getDelim();
+                                       hasHeader = ((CSVFileFormatProperties) 
formatProperties).hasHeader();
+                               }
                                
-                               if( !mc.nnzKnown() )
-                                       
mc.setNonZeros((long)aNnz.value().longValue());
+                               if(hasHeader) {
+                                       StringBuffer buf = new StringBuffer();
+                               for(int j = 1; j < mc.getCols(); j++) {
+                                       if(j != 1) {
+                                               buf.append(sep);
+                                       }
+                                       buf.append("C" + j);
+                               }
+                               ArrayList<String> headerContainer = new 
ArrayList<String>(1);
+                               headerContainer.add(0, buf.toString());
+                               JavaRDD<String> header = 
sec.getSparkContext().parallelize(headerContainer);
+                               out = header.union(out);
+                               }
                        }
-                       else {
-                               //unsupported formats: binarycell (not 
externalized)
-                               throw new DMLRuntimeException("Unexpected data 
format: " + outFmt);
+                       
+                       customSaveTextFile(out, fname, false);
+                       
+                       if( isInputMatrixBlock && !mc.nnzKnown() )
+                               mc.setNonZeros((long)aNnz.value().longValue());
+               }
+               else if( oi == OutputInfo.BinaryBlockOutputInfo ) {
+                       //piggyback nnz computation on actual write
+                       Accumulator<Double> aNnz = null;
+                       if( !mc.nnzKnown() ) {
+                               aNnz = sec.getSparkContext().accumulator(0L);
+                               in1 = in1.mapValues(new 
ComputeBinaryBlockNnzFunction(aNnz));
                        }
                        
-                       // write meta data file
-                       MapReduceTool.writeMetaDataFile (fname + ".mtd", 
ValueType.DOUBLE, mc, oi, formatProperties);   
+                       //save binary block rdd on hdfs
+                       in1.saveAsHadoopFile(fname, MatrixIndexes.class, 
MatrixBlock.class, SequenceFileOutputFormat.class);
+                       
+                       if( !mc.nnzKnown() )
+                               mc.setNonZeros((long)aNnz.value().longValue());
                }
-               catch(IOException ex)
+               else {
+                       //unsupported formats: binarycell (not externalized)
+                       throw new DMLRuntimeException("Unexpected data format: 
" + OutputInfo.outputInfoToString(oi));
+               }
+               
+               // write meta data file
+               MapReduceTool.writeMetaDataFile (fname + ".mtd", 
ValueType.DOUBLE, mc, oi, formatProperties);   
+       }
+
+       /**
+        * 
+        * @param sec
+        * @param fname
+        * @param oi
+        * @throws DMLRuntimeException 
+        * @throws IOException 
+        */
+       @SuppressWarnings("unchecked")
+       protected void processFrameWriteInstruction(SparkExecutionContext sec, 
String fname, OutputInfo oi) 
+               throws DMLRuntimeException, IOException
+       {
+               //get input rdd
+               JavaPairRDD<Long,FrameBlock> in1 = 
(JavaPairRDD<Long,FrameBlock>)sec
+                               .getRDDHandleForVariable( input1.getName(), 
InputInfo.BinaryBlockInputInfo );
+               MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
+               
+               if( oi == OutputInfo.TextCellOutputInfo ) 
                {
-                       throw new DMLRuntimeException("Failed to process write 
instruction", ex);
+                       JavaRDD<String> out = 
FrameRDDConverterUtils.binaryBlockToTextCell(in1, mc);
+                       customSaveTextFile(out, fname, false);
+               }
+               else if( oi == OutputInfo.CSVOutputInfo ) 
+               {
+                       CSVFileFormatProperties props = 
(formatProperties!=null) ? 
+                                       (CSVFileFormatProperties) 
formatProperties : null;                                      
+                       JavaRDD<String> out = 
FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true);
+                       customSaveTextFile(out, fname, false);
+               }
+               else if( oi == OutputInfo.BinaryBlockOutputInfo ) 
+               {
+                       JavaPairRDD<LongWritable,FrameBlock> out = 
in1.mapToPair(new LongFrameBlockToLongWritableFrameBlock());
+                       out.saveAsHadoopFile(fname, LongWritable.class, 
FrameBlock.class, SequenceFileOutputFormat.class);
+               }
+               else {
+                       //unsupported formats: binarycell (not externalized)
+                       throw new DMLRuntimeException("Unexpected data format: 
" + OutputInfo.outputInfoToString(oi));
                }
+               
+               // write meta data file
+               MapReduceTool.writeMetaDataFile(fname + ".mtd", 
input1.getValueType(), DataType.FRAME, mc, oi, formatProperties);       
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
index b8f8e11..3c50668 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
@@ -19,37 +19,46 @@
 package org.apache.sysml.runtime.instructions.spark.functions;
 
 import java.util.ArrayList;
-import org.apache.hadoop.io.LongWritable;
+import java.util.Iterator;
+
 import org.apache.spark.api.java.function.FlatMapFunction;
 
 import scala.Tuple2;
 
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 
-public class ConvertFrameBlockToIJVLines implements 
FlatMapFunction<Tuple2<LongWritable,FrameBlock>, String> {
-
+public class ConvertFrameBlockToIJVLines implements 
FlatMapFunction<Tuple2<Long,FrameBlock>, String> 
+{
        private static final long serialVersionUID = 1803516615963340115L;
 
-       int brlen; int bclen;
-       public ConvertFrameBlockToIJVLines(int brlen, int bclen) {
-               this.brlen = brlen;
-               this.bclen = bclen;
-       }
-
        @Override
-       public Iterable<String> call(Tuple2<LongWritable, FrameBlock> kv) 
throws Exception {
-
-               long lRowIndex = kv._1.get();
+       public Iterable<String> call(Tuple2<Long, FrameBlock> kv) 
+               throws Exception 
+       {
+               long rowoffset = kv._1;
                FrameBlock block = kv._2;
                
                ArrayList<String> cells = new ArrayList<String>();
                
-               for (int i=0; i<block.getNumRows(); ++i)
-                       for (int j=0; j<block.getNumColumns(); ++j) {
-                               Object obj = block.get(i, j);
-                               if(obj != null)
-                                       cells.add(lRowIndex+i+" "+(j+1)+" 
"+obj.toString());
+               //convert frame block to list of ijv cell triples
+               StringBuilder sb = new StringBuilder();
+               Iterator<String[]> iter = block.getStringRowIterator();
+               for( int i=0; iter.hasNext(); i++ ) { //for all rows
+                       String rowIndex = Long.toString(rowoffset + i);
+                       String[] row = iter.next();
+                       for( int j=0; j<row.length; j++ ) {
+                               if( row[j] != null ) {
+                                       sb.append( rowIndex );
+                                       sb.append(' ');
+                                       sb.append( j+1 );
+                                       sb.append(' ');
+                                       sb.append( row[j] );
+                                       cells.add( sb.toString() );
+                                       sb.setLength(0); 
+                               }
                        }
+               }
+               
                return cells;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3805164..003b016 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -132,11 +132,9 @@ public class FrameRDDConverterUtils
         * @param strict
         * @return
         */
-       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics 
mcIn, CSVFileFormatProperties props, boolean strict)
+       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, 
CSVFileFormatProperties props, boolean strict)
        {
-               //convert input rdd to serializable long/frame block
-               JavaPairRDD<LongWritable,FrameBlock> input = 
-                               in.mapToPair(new LongWritableToSerFunction());
+               JavaPairRDD<Long,FrameBlock> input = in;
                
                //sort if required (on blocks/rows)
                if( strict ) {
@@ -152,7 +150,8 @@ public class FrameRDDConverterUtils
        
        
        //=====================================
-       // cellText <--> Binary block
+       // Text cell <--> Binary block
+       
        /**
         * 
         * @param sc
@@ -206,16 +205,19 @@ public class FrameRDDConverterUtils
                return out;
        }
 
-               
-       // Useful for printing, testing binary blocked RDD and also for 
external use.
-       public static JavaRDD<String> 
binaryBlockToStringRDD(JavaPairRDD<LongWritable, FrameBlock> input, 
MatrixCharacteristics mcIn, String format) throws DMLRuntimeException {
-               if(format.equals("text")) {
-                       JavaRDD<String> ijv = input.flatMap(new 
ConvertFrameBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock()));
-                       return ijv;
-               }
-               else {
-                       throw new DMLRuntimeException("The output format:" + 
format + " is not implemented yet.");
-               }
+       /**
+        * 
+        * @param input
+        * @param mcIn
+        * @param format
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaRDD<String> binaryBlockToTextCell(JavaPairRDD<Long, 
FrameBlock> input, MatrixCharacteristics mcIn) 
+               throws DMLRuntimeException 
+       {
+               //convert frame blocks to ijv string triples  
+               return input.flatMap(new ConvertFrameBlockToIJVLines());
        }
        
        //=====================================
@@ -339,6 +341,19 @@ public class FrameRDDConverterUtils
                }
        }
        
+       /**
+        *
+        */
+       public static class LongFrameBlockToLongWritableFrameBlock implements 
PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
3201887196237766424L;
+
+               @Override
+               public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, 
FrameBlock> arg0) throws Exception  {
+                       return new Tuple2<LongWritable,FrameBlock>(new 
LongWritable(arg0._1), arg0._2);
+               }
+       }
+       
        
        
        /**
@@ -465,7 +480,7 @@ public class FrameRDDConverterUtils
        /**
         * 
         */
-       private static class BinaryBlockToCSVFunction implements 
FlatMapFunction<Tuple2<LongWritable,FrameBlock>,String> 
+       private static class BinaryBlockToCSVFunction implements 
FlatMapFunction<Tuple2<Long,FrameBlock>,String> 
        {
                private static final long serialVersionUID = 
8020608184930291069L;
 
@@ -476,16 +491,16 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<String> call(Tuple2<LongWritable, FrameBlock> 
arg0)
+               public Iterable<String> call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
-                       LongWritable ix = arg0._1();
+                       Long ix = arg0._1();
                        FrameBlock blk = arg0._2();
                        
                        ArrayList<String> ret = new ArrayList<String>();
                        
                        //handle header information
-                       if(_props.hasHeader() && ix.get()==1 ) {
+                       if(_props.hasHeader() && ix==1 ) {
                                StringBuilder sb = new StringBuilder();
                                for(int j = 1; j <= blk.getNumColumns(); j++) {
                                        if(j != 1)
@@ -497,14 +512,14 @@ public class FrameRDDConverterUtils
                
                        //handle Frame block data
                        StringBuilder sb = new StringBuilder();
-                       for(int i=0; i<blk.getNumRows(); i++) {
-                               for(int j=0; j<blk.getNumColumns(); j++) {
+                       Iterator<String[]> iter = blk.getStringRowIterator();
+                       while( iter.hasNext() ) {
+                               String[] row = iter.next();
+                               for(int j=0; j<row.length; j++) {
                                        if(j != 0)
                                                sb.append(_props.getDelim());
-                                       Object val = blk.get(i, j);
-                               
-                                       if(val != null)
-                                               sb.append(val);
+                                       if(row[j] != null)
+                                               sb.append(row[j]);
                                }
                                ret.add(sb.toString());
                                sb.setLength(0); //reset

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index a0056dc..97ac27c 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -37,6 +37,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import 
org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
@@ -449,8 +450,9 @@ public class FrameConverterTest extends AutomatedTestBase
                        case BIN2CSV: {
                                InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
                                JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
+                               JavaPairRDD<Long, FrameBlock> rddIn2 = 
rddIn.mapToPair(new CopyFrameBlockPairFunction(false));
                                CSVFileFormatProperties fprop = new 
CSVFileFormatProperties();
-                               JavaRDD<String> rddOut = 
FrameRDDConverterUtils.binaryBlockToCsv(rddIn, mc, fprop, true);
+                               JavaRDD<String> rddOut = 
FrameRDDConverterUtils.binaryBlockToCsv(rddIn2, mc, fprop, true);
                                rddOut.saveAsTextFile(fnameOut);
                                break;
                        }
@@ -466,7 +468,8 @@ public class FrameConverterTest extends AutomatedTestBase
                        case BIN2TXTCELL: {
                                InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
                                JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
-                               JavaRDD<String> rddOut = 
FrameRDDConverterUtils.binaryBlockToStringRDD(rddIn, mc, "text");
+                               JavaPairRDD<Long, FrameBlock> rddIn2 = 
rddIn.mapToPair(new CopyFrameBlockPairFunction(false));
+                               JavaRDD<String> rddOut = 
FrameRDDConverterUtils.binaryBlockToTextCell(rddIn2, mc);
                                rddOut.saveAsTextFile(fnameOut);
                                break;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java
new file mode 100644
index 0000000..7e28640
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import java.io.IOException;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.io.FrameReader;
+import org.apache.sysml.runtime.io.FrameReaderFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class FrameMatrixWriteTest extends AutomatedTestBase
+{
+       private final static String TEST_DIR = "functions/frame/";
+       private final static String TEST_NAME1 = "FrameMatrixWrite";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
FrameMatrixWriteTest.class.getSimpleName() + "/";
+
+       private final static int rows = 2593;
+       private final static int cols1 = 372;
+       private final static int cols2 = 1102;
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"B"}));
+       }
+       
+       @Test
+       public void testFrameWriteSingleBinaryCP() {
+               runFrameWriteTest(TEST_NAME1, false, "binary", ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleTextcellCP() {
+               runFrameWriteTest(TEST_NAME1, false, "text", ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteSingleCsvCP() {
+               runFrameWriteTest(TEST_NAME1, false, "csv", ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleBinaryCP() {
+               runFrameWriteTest(TEST_NAME1, true, "binary", ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleTextcellCP() {
+               runFrameWriteTest(TEST_NAME1, true, "text", ExecType.CP);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleCsvCP() {
+               runFrameWriteTest(TEST_NAME1, true, "csv", ExecType.CP);
+       }
+
+       @Test
+       public void testFrameWriteSingleBinarySpark() {
+               runFrameWriteTest(TEST_NAME1, false, "binary", ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteSingleTextcellSpark() {
+               runFrameWriteTest(TEST_NAME1, false, "text", ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteSingleCsvSpark() {
+               runFrameWriteTest(TEST_NAME1, false, "csv", ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleBinarySpark() {
+               runFrameWriteTest(TEST_NAME1, true, "binary", ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleTextcellSpark() {
+               runFrameWriteTest(TEST_NAME1, true, "text", ExecType.SPARK);
+       }
+       
+       @Test
+       public void testFrameWriteMultipleCsvSpark() {
+               runFrameWriteTest(TEST_NAME1, true, "csv", ExecType.SPARK);
+       }
+       
+       /**
+        * 
+        * @param testname
+        * @param multColBlks
+        * @param ofmt
+        * @param et
+        */
+       private void runFrameWriteTest( String testname, boolean multColBlks, 
String ofmt, ExecType et)
+       {
+               //rtplatform for MR
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+               }
+       
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
+               try
+               {
+                       int cols = multColBlks ? cols2 : cols1;
+                       
+                       TestConfiguration config = 
getTestConfiguration(testname);
+                       loadTestConfiguration(config);
+                       
+                       String HOME = SCRIPT_DIR + TEST_DIR;
+                       fullDMLScriptName = HOME + testname + ".dml";
+                       programArgs = new String[]{"-explain","-args", 
String.valueOf(rows), 
+                                       String.valueOf(cols), output("B"), ofmt 
};
+                       
+                       //run testcase
+                       runTest(true, false, null, -1);
+                       
+                       //generate compare data
+                       double[][] A = new double[rows][cols];
+                       for( int i=0; i<rows; i++ )
+                               for( int j=0; j<cols; j++ )
+                                       A[i][j] = (i+1)+(j+1);
+                       
+                       //compare matrices
+                       double[][] B = readFrameInput(output("B"), ofmt, rows, 
cols);
+                       TestUtils.compareMatrices(A, B, rows, cols, 0);
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+       
+       /**
+        * 
+        * @param fname
+        * @param ofmt
+        * @param rows
+        * @param cols
+        * @return
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       private double[][] readFrameInput(String fname, String ofmt, int rows, 
int cols) 
+               throws DMLRuntimeException, IOException 
+       {
+               //read input data
+               FrameReader reader = 
FrameReaderFactory.createFrameReader(InputInfo.stringExternalToInputInfo(ofmt));
+               FrameBlock fb = reader.readFrameFromHDFS(fname, rows, cols);
+               MatrixBlock ret = DataConverter.convertToMatrixBlock(fb);
+               
+               return DataConverter.convertToDoubleMatrix(ret);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/scripts/functions/frame/FrameMatrixWrite.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameMatrixWrite.dml 
b/src/test/scripts/functions/frame/FrameMatrixWrite.dml
new file mode 100644
index 0000000..3cd8f7c
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameMatrixWrite.dml
@@ -0,0 +1,26 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = matrix(1, rows=$1, cols=$2);
+A = A * seq(1,nrow(A)) + t(seq(1,ncol(A)));
+
+B = as.frame(A);
+write(B, $3, format=$4);

Reply via email to