Repository: incubator-systemml Updated Branches: refs/heads/master cf4e5ab6e -> c76b01a75
[SYSTEMML-562] Spark frame write instruction (binary/text/csv), tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c76b01a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c76b01a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c76b01a7 Branch: refs/heads/master Commit: c76b01a753837150c590c79557acdccb9d756a7e Parents: cf4e5ab Author: Matthias Boehm <[email protected]> Authored: Wed Jun 8 23:24:18 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jun 9 18:00:50 2016 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/DataOp.java | 5 +- .../instructions/spark/WriteSPInstruction.java | 257 ++++++++++++------- .../functions/ConvertFrameBlockToIJVLines.java | 43 ++-- .../spark/utils/FrameRDDConverterUtils.java | 65 +++-- .../functions/frame/FrameConverterTest.java | 7 +- .../functions/frame/FrameMatrixWriteTest.java | 193 ++++++++++++++ .../functions/frame/FrameMatrixWrite.dml | 26 ++ 7 files changed, 454 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/hops/DataOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/DataOp.java b/src/main/java/org/apache/sysml/hops/DataOp.java index d8e5519..b4dbae4 100644 --- a/src/main/java/org/apache/sysml/hops/DataOp.java +++ b/src/main/java/org/apache/sysml/hops/DataOp.java @@ -37,7 +37,6 @@ import org.apache.sysml.runtime.util.LocalFileUtils; public class DataOp extends Hop { - private DataOpTypes _dataop; private String _fileName = null; @@ -436,8 +435,8 @@ public class DataOp extends Hop { checkAndSetForcedPlatform(); - //additional check for write only (TODO: remove frame here once support for distributed) - if( getDataType()==DataType.SCALAR || getDataType()==DataType.FRAME ) + //additional check for write only + if( getDataType()==DataType.SCALAR || (getDataType()==DataType.FRAME && REMOTE==ExecType.MR) ) _etypeForced = ExecType.CP; if( _etypeForced != null ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java index 3ea03ef..682e0b7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java @@ -23,11 +23,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Random; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; - +import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; @@ -36,11 +37,15 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction; import org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameBlockToLongWritableFrameBlock; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FileFormatProperties; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -54,6 +59,7 @@ public class WriteSPInstruction extends SPInstruction private FileFormatProperties formatProperties; //scalars might occur for transform + // TODO remove once transform over frames supported private boolean isInputMatrixBlock = true; public WriteSPInstruction(String opcode, String istr) { @@ -87,10 +93,9 @@ public class WriteSPInstruction extends SPInstruction //SPARK°write°_mVar2·MATRIX·DOUBLE°./src/test/scripts/functions/data/out/B·SCALAR·STRING·true°matrixmarket·SCALAR·STRING·true // _mVar2·MATRIX·DOUBLE - CPOperand in1=null, in2=null, in3=null; - in1 = new CPOperand(parts[1]); - in2 = new CPOperand(parts[2]); - in3 = new CPOperand(parts[3]); + CPOperand in1 = new CPOperand(parts[1]); + CPOperand in2 = new CPOperand(parts[2]); + CPOperand in3 = new CPOperand(parts[3]); WriteSPInstruction inst = new WriteSPInstruction(in1, in2, in3, opcode, str); @@ -141,110 +146,172 @@ public class WriteSPInstruction extends SPInstruction //prepare output info according to meta data String outFmt = input3.getName(); OutputInfo oi = OutputInfo.stringToOutputInfo(outFmt); - - //get input rdd - JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); - MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); - if( oi == OutputInfo.MatrixMarketOutputInfo - || oi == OutputInfo.TextCellOutputInfo ) - { - //recompute nnz if necessary (required for header if matrix market) - if ( isInputMatrixBlock && !mc.nnzKnown() ) - mc.setNonZeros( SparkUtils.computeNNZFromBlocks(in1) ); - - JavaRDD<String> header = null; - if(outFmt.equalsIgnoreCase("matrixmarket")) { - ArrayList<String> headerContainer = new ArrayList<String>(1); - // First output MM header - String headerStr = "%%MatrixMarket matrix coordinate real general\n" + - // output number of rows, number of columns and number of nnz - mc.getRows() + " " + mc.getCols() + " " + mc.getNonZeros(); - headerContainer.add(headerStr); - header = sec.getSparkContext().parallelize(headerContainer); - } - - JavaRDD<String> ijv = in1.flatMap(new ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock())); - if(header != null) - customSaveTextFile(header.union(ijv), fname, true); - else - customSaveTextFile(ijv, fname, false); - } - else if( oi == OutputInfo.CSVOutputInfo ) - { - JavaRDD<String> out = null; - Accumulator<Double> aNnz = null; - - if ( isInputMatrixBlock ) { - //piggyback nnz computation on actual write - if( !mc.nnzKnown() ) { - aNnz = sec.getSparkContext().accumulator(0L); - in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); - } - - out = RDDConverterUtils.binaryBlockToCsv(in1, mc, - (CSVFileFormatProperties) formatProperties, true); - } - else - { - // This case is applicable when the CSV output from transform() is written out - @SuppressWarnings("unchecked") - JavaPairRDD<Long,String> rdd = (JavaPairRDD<Long, String>) (sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD(); - out = rdd.values(); - - String sep = ","; - boolean hasHeader = false; - if(formatProperties != null) { - sep = ((CSVFileFormatProperties) formatProperties).getDelim(); - hasHeader = ((CSVFileFormatProperties) formatProperties).hasHeader(); - } - - if(hasHeader) { - StringBuffer buf = new StringBuffer(); - for(int j = 1; j < mc.getCols(); j++) { - if(j != 1) { - buf.append(sep); - } - buf.append("C" + j); - } - ArrayList<String> headerContainer = new ArrayList<String>(1); - headerContainer.add(0, buf.toString()); - JavaRDD<String> header = sec.getSparkContext().parallelize(headerContainer); - out = header.union(out); - } - } - - customSaveTextFile(out, fname, false); - - if( isInputMatrixBlock && !mc.nnzKnown() ) - mc.setNonZeros((long)aNnz.value().longValue()); + //core matrix/frame write + if( input1.getDataType()==DataType.MATRIX ) + processMatrixWriteInstruction(sec, fname, oi); + else + processFrameWriteInstruction(sec, fname, oi); + } + catch(IOException ex) + { + throw new DMLRuntimeException("Failed to process write instruction", ex); + } + } + + /** + * + * @param sec + * @param fname + * @param oi + * @throws DMLRuntimeException + * @throws IOException + */ + protected void processMatrixWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi) + throws DMLRuntimeException, IOException + { + //get input rdd + JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = sec.getBinaryBlockRDDHandleForVariable( input1.getName() ); + MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); + + if( oi == OutputInfo.MatrixMarketOutputInfo + || oi == OutputInfo.TextCellOutputInfo ) + { + //recompute nnz if necessary (required for header if matrix market) + if ( isInputMatrixBlock && !mc.nnzKnown() ) + mc.setNonZeros( SparkUtils.computeNNZFromBlocks(in1) ); + + JavaRDD<String> header = null; + if( oi == OutputInfo.MatrixMarketOutputInfo ) { + ArrayList<String> headerContainer = new ArrayList<String>(1); + // First output MM header + String headerStr = "%%MatrixMarket matrix coordinate real general\n" + + // output number of rows, number of columns and number of nnz + mc.getRows() + " " + mc.getCols() + " " + mc.getNonZeros(); + headerContainer.add(headerStr); + header = sec.getSparkContext().parallelize(headerContainer); } - else if( oi == OutputInfo.BinaryBlockOutputInfo ) { + + JavaRDD<String> ijv = in1.flatMap(new ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock())); + if(header != null) + customSaveTextFile(header.union(ijv), fname, true); + else + customSaveTextFile(ijv, fname, false); + } + else if( oi == OutputInfo.CSVOutputInfo ) + { + JavaRDD<String> out = null; + Accumulator<Double> aNnz = null; + + if ( isInputMatrixBlock ) { //piggyback nnz computation on actual write - Accumulator<Double> aNnz = null; if( !mc.nnzKnown() ) { aNnz = sec.getSparkContext().accumulator(0L); in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); - } + } - //save binary block rdd on hdfs - in1.saveAsHadoopFile(fname, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class); + out = RDDConverterUtils.binaryBlockToCsv(in1, mc, + (CSVFileFormatProperties) formatProperties, true); + } + else + { + // This case is applicable when the CSV output from transform() is written out + // TODO remove once transform over frames supported + @SuppressWarnings("unchecked") + JavaPairRDD<Long,String> rdd = (JavaPairRDD<Long, String>) (sec.getMatrixObject(input1.getName())).getRDDHandle().getRDD(); + out = rdd.values(); + + String sep = ","; + boolean hasHeader = false; + if(formatProperties != null) { + sep = ((CSVFileFormatProperties) formatProperties).getDelim(); + hasHeader = ((CSVFileFormatProperties) formatProperties).hasHeader(); + } - if( !mc.nnzKnown() ) - mc.setNonZeros((long)aNnz.value().longValue()); + if(hasHeader) { + StringBuffer buf = new StringBuffer(); + for(int j = 1; j < mc.getCols(); j++) { + if(j != 1) { + buf.append(sep); + } + buf.append("C" + j); + } + ArrayList<String> headerContainer = new ArrayList<String>(1); + headerContainer.add(0, buf.toString()); + JavaRDD<String> header = sec.getSparkContext().parallelize(headerContainer); + out = header.union(out); + } } - else { - //unsupported formats: binarycell (not externalized) - throw new DMLRuntimeException("Unexpected data format: " + outFmt); + + customSaveTextFile(out, fname, false); + + if( isInputMatrixBlock && !mc.nnzKnown() ) + mc.setNonZeros((long)aNnz.value().longValue()); + } + else if( oi == OutputInfo.BinaryBlockOutputInfo ) { + //piggyback nnz computation on actual write + Accumulator<Double> aNnz = null; + if( !mc.nnzKnown() ) { + aNnz = sec.getSparkContext().accumulator(0L); + in1 = in1.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); } - // write meta data file - MapReduceTool.writeMetaDataFile (fname + ".mtd", ValueType.DOUBLE, mc, oi, formatProperties); + //save binary block rdd on hdfs + in1.saveAsHadoopFile(fname, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class); + + if( !mc.nnzKnown() ) + mc.setNonZeros((long)aNnz.value().longValue()); } - catch(IOException ex) + else { + //unsupported formats: binarycell (not externalized) + throw new DMLRuntimeException("Unexpected data format: " + OutputInfo.outputInfoToString(oi)); + } + + // write meta data file + MapReduceTool.writeMetaDataFile (fname + ".mtd", ValueType.DOUBLE, mc, oi, formatProperties); + } + + /** + * + * @param sec + * @param fname + * @param oi + * @throws DMLRuntimeException + * @throws IOException + */ + @SuppressWarnings("unchecked") + protected void processFrameWriteInstruction(SparkExecutionContext sec, String fname, OutputInfo oi) + throws DMLRuntimeException, IOException + { + //get input rdd + JavaPairRDD<Long,FrameBlock> in1 = (JavaPairRDD<Long,FrameBlock>)sec + .getRDDHandleForVariable( input1.getName(), InputInfo.BinaryBlockInputInfo ); + MatrixCharacteristics mc = sec.getMatrixCharacteristics(input1.getName()); + + if( oi == OutputInfo.TextCellOutputInfo ) { - throw new DMLRuntimeException("Failed to process write instruction", ex); + JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToTextCell(in1, mc); + customSaveTextFile(out, fname, false); + } + else if( oi == OutputInfo.CSVOutputInfo ) + { + CSVFileFormatProperties props = (formatProperties!=null) ? + (CSVFileFormatProperties) formatProperties : null; + JavaRDD<String> out = FrameRDDConverterUtils.binaryBlockToCsv(in1, mc, props, true); + customSaveTextFile(out, fname, false); + } + else if( oi == OutputInfo.BinaryBlockOutputInfo ) + { + JavaPairRDD<LongWritable,FrameBlock> out = in1.mapToPair(new LongFrameBlockToLongWritableFrameBlock()); + out.saveAsHadoopFile(fname, LongWritable.class, FrameBlock.class, SequenceFileOutputFormat.class); + } + else { + //unsupported formats: binarycell (not externalized) + throw new DMLRuntimeException("Unexpected data format: " + OutputInfo.outputInfoToString(oi)); } + + // write meta data file + MapReduceTool.writeMetaDataFile(fname + ".mtd", input1.getValueType(), DataType.FRAME, mc, oi, formatProperties); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java index b8f8e11..3c50668 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java @@ -19,37 +19,46 @@ package org.apache.sysml.runtime.instructions.spark.functions; import java.util.ArrayList; -import org.apache.hadoop.io.LongWritable; +import java.util.Iterator; + import org.apache.spark.api.java.function.FlatMapFunction; import scala.Tuple2; import org.apache.sysml.runtime.matrix.data.FrameBlock; -public class ConvertFrameBlockToIJVLines implements FlatMapFunction<Tuple2<LongWritable,FrameBlock>, String> { - +public class ConvertFrameBlockToIJVLines implements FlatMapFunction<Tuple2<Long,FrameBlock>, String> +{ private static final long serialVersionUID = 1803516615963340115L; - int brlen; int bclen; - public ConvertFrameBlockToIJVLines(int brlen, int bclen) { - this.brlen = brlen; - this.bclen = bclen; - } - @Override - public Iterable<String> call(Tuple2<LongWritable, FrameBlock> kv) throws Exception { - - long lRowIndex = kv._1.get(); + public Iterable<String> call(Tuple2<Long, FrameBlock> kv) + throws Exception + { + long rowoffset = kv._1; FrameBlock block = kv._2; ArrayList<String> cells = new ArrayList<String>(); - for (int i=0; i<block.getNumRows(); ++i) - for (int j=0; j<block.getNumColumns(); ++j) { - Object obj = block.get(i, j); - if(obj != null) - cells.add(lRowIndex+i+" "+(j+1)+" "+obj.toString()); + //convert frame block to list of ijv cell triples + StringBuilder sb = new StringBuilder(); + Iterator<String[]> iter = block.getStringRowIterator(); + for( int i=0; iter.hasNext(); i++ ) { //for all rows + String rowIndex = Long.toString(rowoffset + i); + String[] row = iter.next(); + for( int j=0; j<row.length; j++ ) { + if( row[j] != null ) { + sb.append( rowIndex ); + sb.append(' '); + sb.append( j+1 ); + sb.append(' '); + sb.append( row[j] ); + cells.add( sb.toString() ); + sb.setLength(0); + } } + } + return cells; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 3805164..003b016 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -132,11 +132,9 @@ public class FrameRDDConverterUtils * @param strict * @return */ - public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) + public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) { - //convert input rdd to serializable long/frame block - JavaPairRDD<LongWritable,FrameBlock> input = - in.mapToPair(new LongWritableToSerFunction()); + JavaPairRDD<Long,FrameBlock> input = in; //sort if required (on blocks/rows) if( strict ) { @@ -152,7 +150,8 @@ public class FrameRDDConverterUtils //===================================== - // cellText <--> Binary block + // Text cell <--> Binary block + /** * * @param sc @@ -206,16 +205,19 @@ public class FrameRDDConverterUtils return out; } - - // Useful for printing, testing binary blocked RDD and also for external use. - public static JavaRDD<String> binaryBlockToStringRDD(JavaPairRDD<LongWritable, FrameBlock> input, MatrixCharacteristics mcIn, String format) throws DMLRuntimeException { - if(format.equals("text")) { - JavaRDD<String> ijv = input.flatMap(new ConvertFrameBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock())); - return ijv; - } - else { - throw new DMLRuntimeException("The output format:" + format + " is not implemented yet."); - } + /** + * + * @param input + * @param mcIn + * @param format + * @return + * @throws DMLRuntimeException + */ + public static JavaRDD<String> binaryBlockToTextCell(JavaPairRDD<Long, FrameBlock> input, MatrixCharacteristics mcIn) + throws DMLRuntimeException + { + //convert frame blocks to ijv string triples + return input.flatMap(new ConvertFrameBlockToIJVLines()); } //===================================== @@ -339,6 +341,19 @@ public class FrameRDDConverterUtils } } + /** + * + */ + public static class LongFrameBlockToLongWritableFrameBlock implements PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> + { + private static final long serialVersionUID = 3201887196237766424L; + + @Override + public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, FrameBlock> arg0) throws Exception { + return new Tuple2<LongWritable,FrameBlock>(new LongWritable(arg0._1), arg0._2); + } + } + /** @@ -465,7 +480,7 @@ public class FrameRDDConverterUtils /** * */ - private static class BinaryBlockToCSVFunction implements FlatMapFunction<Tuple2<LongWritable,FrameBlock>,String> + private static class BinaryBlockToCSVFunction implements FlatMapFunction<Tuple2<Long,FrameBlock>,String> { private static final long serialVersionUID = 8020608184930291069L; @@ -476,16 +491,16 @@ public class FrameRDDConverterUtils } @Override - public Iterable<String> call(Tuple2<LongWritable, FrameBlock> arg0) + public Iterable<String> call(Tuple2<Long, FrameBlock> arg0) throws Exception { - LongWritable ix = arg0._1(); + Long ix = arg0._1(); FrameBlock blk = arg0._2(); ArrayList<String> ret = new ArrayList<String>(); //handle header information - if(_props.hasHeader() && ix.get()==1 ) { + if(_props.hasHeader() && ix==1 ) { StringBuilder sb = new StringBuilder(); for(int j = 1; j <= blk.getNumColumns(); j++) { if(j != 1) @@ -497,14 +512,14 @@ public class FrameRDDConverterUtils //handle Frame block data StringBuilder sb = new StringBuilder(); - for(int i=0; i<blk.getNumRows(); i++) { - for(int j=0; j<blk.getNumColumns(); j++) { + Iterator<String[]> iter = blk.getStringRowIterator(); + while( iter.hasNext() ) { + String[] row = iter.next(); + for(int j=0; j<row.length; j++) { if(j != 0) sb.append(_props.getDelim()); - Object val = blk.get(i, j); - - if(val != null) - sb.append(val); + if(row[j] != null) + sb.append(row[j]); } ret.add(sb.toString()); sb.setLength(0); //reset http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index a0056dc..97ac27c 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -37,6 +37,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; @@ -449,8 +450,9 @@ public class FrameConverterTest extends AutomatedTestBase case BIN2CSV: { InputInfo iinfo = InputInfo.BinaryBlockInputInfo; JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); + JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new CopyFrameBlockPairFunction(false)); CSVFileFormatProperties fprop = new CSVFileFormatProperties(); - JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToCsv(rddIn, mc, fprop, true); + JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToCsv(rddIn2, mc, fprop, true); rddOut.saveAsTextFile(fnameOut); break; } @@ -466,7 +468,8 @@ public class FrameConverterTest extends AutomatedTestBase case BIN2TXTCELL: { InputInfo iinfo = InputInfo.BinaryBlockInputInfo; JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); - JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToStringRDD(rddIn, mc, "text"); + JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new CopyFrameBlockPairFunction(false)); + JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToTextCell(rddIn2, mc); rddOut.saveAsTextFile(fnameOut); break; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java new file mode 100644 index 0000000..7e28640 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMatrixWriteTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.frame; + +import java.io.IOException; + +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.FrameReader; +import org.apache.sysml.runtime.io.FrameReaderFactory; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; + +/** + * + */ +public class FrameMatrixWriteTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_NAME1 = "FrameMatrixWrite"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameMatrixWriteTest.class.getSimpleName() + "/"; + + private final static int rows = 2593; + private final static int cols1 = 372; + private final static int cols2 = 1102; + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"B"})); + } + + @Test + public void testFrameWriteSingleBinaryCP() { + runFrameWriteTest(TEST_NAME1, false, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteSingleTextcellCP() { + runFrameWriteTest(TEST_NAME1, false, "text", ExecType.CP); + } + + @Test + public void testFrameWriteSingleCsvCP() { + runFrameWriteTest(TEST_NAME1, false, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleBinaryCP() { + runFrameWriteTest(TEST_NAME1, true, "binary", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleTextcellCP() { + runFrameWriteTest(TEST_NAME1, true, "text", ExecType.CP); + } + + @Test + public void testFrameWriteMultipleCsvCP() { + runFrameWriteTest(TEST_NAME1, true, "csv", ExecType.CP); + } + + @Test + public void testFrameWriteSingleBinarySpark() { + runFrameWriteTest(TEST_NAME1, false, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteSingleTextcellSpark() { + runFrameWriteTest(TEST_NAME1, false, "text", ExecType.SPARK); + } + + @Test + public void testFrameWriteSingleCsvSpark() { + runFrameWriteTest(TEST_NAME1, false, "csv", ExecType.SPARK); + } + + @Test + public void testFrameWriteMultipleBinarySpark() { + runFrameWriteTest(TEST_NAME1, true, "binary", ExecType.SPARK); + } + + @Test + public void testFrameWriteMultipleTextcellSpark() { + runFrameWriteTest(TEST_NAME1, true, "text", ExecType.SPARK); + } + + @Test + public void testFrameWriteMultipleCsvSpark() { + runFrameWriteTest(TEST_NAME1, true, "csv", ExecType.SPARK); + } + + /** + * + * @param testname + * @param multColBlks + * @param ofmt + * @param et + */ + private void runFrameWriteTest( String testname, boolean multColBlks, String ofmt, ExecType et) + { + //rtplatform for MR + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + try + { + int cols = multColBlks ? cols2 : cols1; + + TestConfiguration config = getTestConfiguration(testname); + loadTestConfiguration(config); + + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + testname + ".dml"; + programArgs = new String[]{"-explain","-args", String.valueOf(rows), + String.valueOf(cols), output("B"), ofmt }; + + //run testcase + runTest(true, false, null, -1); + + //generate compare data + double[][] A = new double[rows][cols]; + for( int i=0; i<rows; i++ ) + for( int j=0; j<cols; j++ ) + A[i][j] = (i+1)+(j+1); + + //compare matrices + double[][] B = readFrameInput(output("B"), ofmt, rows, cols); + TestUtils.compareMatrices(A, B, rows, cols, 0); + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } + + /** + * + * @param fname + * @param ofmt + * @param rows + * @param cols + * @return + * @throws DMLRuntimeException + * @throws IOException + */ + private double[][] readFrameInput(String fname, String ofmt, int rows, int cols) + throws DMLRuntimeException, IOException + { + //read input data + FrameReader reader = FrameReaderFactory.createFrameReader(InputInfo.stringExternalToInputInfo(ofmt)); + FrameBlock fb = reader.readFrameFromHDFS(fname, rows, cols); + MatrixBlock ret = DataConverter.convertToMatrixBlock(fb); + + return DataConverter.convertToDoubleMatrix(ret); + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c76b01a7/src/test/scripts/functions/frame/FrameMatrixWrite.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/functions/frame/FrameMatrixWrite.dml b/src/test/scripts/functions/frame/FrameMatrixWrite.dml new file mode 100644 index 0000000..3cd8f7c --- /dev/null +++ b/src/test/scripts/functions/frame/FrameMatrixWrite.dml @@ -0,0 +1,26 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +A = matrix(1, rows=$1, cols=$2); +A = A * seq(1,nrow(A)) + t(seq(1,ncol(A))); + +B = as.frame(A); +write(B, $3, format=$4);
