Repository: incubator-systemml Updated Branches: refs/heads/master 821c5f50d -> 14e9f6443
[SYSTEMML-560] Frame DataFrame converters Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/14e9f644 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/14e9f644 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/14e9f644 Branch: refs/heads/master Commit: 14e9f64439ce58acd3b3cdaf53e75a768a4757f0 Parents: 821c5f5 Author: Arvind Surve <[email protected]> Authored: Fri Jul 22 22:34:27 2016 -0700 Committer: Arvind Surve <[email protected]> Committed: Fri Jul 22 22:34:27 2016 -0700 ---------------------------------------------------------------------- .../spark/utils/FrameRDDConverterUtils.java | 214 +++++++++++++++++-- .../apache/sysml/runtime/io/FrameReader.java | 2 +- .../runtime/matrix/data/LibMatrixBincell.java | 2 +- .../runtime/matrix/data/LibMatrixOuterAgg.java | 26 +-- .../sysml/runtime/matrix/data/MatrixBlock.java | 4 +- .../matrix/sort/SamplingSortMRInputFormat.java | 4 +- .../sysml/runtime/util/DataConverter.java | 4 +- .../sysml/runtime/util/UtilFunctions.java | 44 ++++ .../functions/frame/FrameAppendDistTest.java | 2 +- .../functions/frame/FrameConverterTest.java | 89 +++++++- .../functions/frame/FrameCopyTest.java | 2 +- .../functions/frame/FrameIndexingDistTest.java | 2 +- .../functions/frame/FrameReadWriteTest.java | 2 +- 13 files changed, 349 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 052d3f3..c243eeb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -36,6 +36,13 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import scala.Tuple2; @@ -101,8 +108,7 @@ public class FrameRDDConverterUtils //convert csv rdd to binary block rdd (w/ partial blocks) JavaPairRDD<Long, FrameBlock> out = prepinput - .mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill)) - .mapToPair(new LongWritableFrameToLongFrameFunction()); + .mapPartitionsToPair(new CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill)); return out; } @@ -300,8 +306,70 @@ public class FrameRDDConverterUtils return out; } + //===================================== + // DataFrame <--> Binary block + /** + * + * @param sc + * @param input + * @param mcOut + * @param hasHeader + * @param delim + * @param fill + * @param missingValue + * @return + * @throws DMLRuntimeException + */ + public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc, + DataFrame df, MatrixCharacteristics mcOut, boolean containsID) + throws DMLRuntimeException + { + + if(containsID) + df = df.drop("ID"); + + //determine unknown dimensions if required + if( !mcOut.dimsKnown(true) ) { + JavaRDD<Row> tmp = df.javaRDD(); + long rlen = tmp.count(); + long clen = containsID ? (df.columns().length - 1) : df.columns().length; + mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); + } + + JavaPairRDD<Row, Long> prepinput = df.javaRDD() + .zipWithIndex(); //zip row index + + //convert rdd to binary block rdd + JavaPairRDD<Long, FrameBlock> out = prepinput + .mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut)); + + return out; + } + + /** + * + * @param in + * @param mcIn + * @param props + * @param strict + * @return + */ + public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc) + { + List<ValueType> schema = in.first()._2().getSchema(); + + //convert binary block to rows rdd (from blocks/rows) + JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction()); + + SQLContext sqlContext = new SQLContext(sc); + StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema); + DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); + + return df; + } + ///////////////////////////////// // CSV-SPECIFIC FUNCTIONS @@ -391,7 +459,7 @@ public class FrameRDDConverterUtils * In terms of memory consumption this is better than creating partial blocks of row segments. * */ - private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,LongWritable,FrameBlock> + private static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,Long,FrameBlock> { private static final long serialVersionUID = -1976803898174960086L; @@ -413,12 +481,12 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<LongWritable, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) throws Exception { - ArrayList<Tuple2<LongWritable,FrameBlock>> ret = new ArrayList<Tuple2<LongWritable,FrameBlock>>(); + ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); - LongWritable[] ix = new LongWritable[1]; + Long[] ix = new Long[1]; FrameBlock[] mb = new FrameBlock[1]; int iRowsInBlock = 0; @@ -467,10 +535,10 @@ public class FrameRDDConverterUtils } // Creates new state of empty column blocks for current global row index. - private void createBlocks(long rowix, LongWritable[] ix, FrameBlock[] mb) + private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb) { //compute row block index and number of column blocks - ix[0] = new LongWritable(rowix); + ix[0] = rowix; mb[0] = new FrameBlock((int)_clen, ValueType.STRING); if( _colnames != null ) mb[0].setColumnNames(_colnames); @@ -481,17 +549,6 @@ public class FrameRDDConverterUtils for( int j=0; j<_clen; j++ ) mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j))); } - - // Flushes current state of filled column blocks to output list. - private void flushBlocksToList( LongWritable[] ix, FrameBlock[] mb, ArrayList<Tuple2<LongWritable,FrameBlock>> ret ) - throws DMLRuntimeException - { - int len = ix.length; - for( int i=0; i<len; i++ ) - if( mb[i] != null ) { - ret.add(new Tuple2<LongWritable,FrameBlock>(ix[i],mb[i])); - } - } } /** @@ -558,6 +615,111 @@ public class FrameRDDConverterUtils return ret; } } + + ///////////////////////////////// + // DataFrame-SPECIFIC FUNCTIONS + + private static class DataFrameToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,Long,FrameBlock> + { + private static final long serialVersionUID = 2269315691094111843L; + + private long _clen = -1; + private int _maxRowsPerBlock = -1; + + public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) { + _clen = mc.getCols(); + _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception { + ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); + + Long[] ix = new Long[1]; + FrameBlock[] mb = new FrameBlock[1]; + int iRowsInBlock = 0; + + while( arg0.hasNext() ) + { + Tuple2<Row,Long> tmp = arg0.next(); + Row row = tmp._1(); + long rowix = tmp._2()+1; + + if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) { + if( iRowsInBlock == _maxRowsPerBlock ) + flushBlocksToList(ix, mb, ret); + createBlocks(rowix, ix, mb, row); + iRowsInBlock = 0; + } + + //process row data + Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema()); + mb[0].appendRow(parts); + iRowsInBlock++; + } + + //flush last blocks + flushBlocksToList(ix, mb, ret); + + return ret; + } + + public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception { + Object[] ret = new Object[_clen]; + for(int i = 0; i < row.length(); i++) + ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i)); + for(int i=row.length(); i<_clen; i++) + ret[i] = ""; + return ret; + } + + // Creates new state of empty column blocks for current global row index. + private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row) + { + //compute row block index and number of column blocks + ix[0] = new Long(rowix); + + List<String> columns = new ArrayList<String>(); + List<ValueType> schema = new ArrayList<ValueType>(); + for (StructField structType: row.schema().fields()) { + columns.add(structType.name()); + if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType) + schema.add(ValueType.DOUBLE); + else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType) + schema.add(ValueType.INT); + else if(structType.dataType() == DataTypes.BooleanType) + schema.add(ValueType.BOOLEAN); + else + schema.add(ValueType.STRING); + } + mb[0] = new FrameBlock(schema); + mb[0].setColumnNames(columns); + } + } + + /** + * + */ + private static class BinaryBlockToDataFrameFunction implements FlatMapFunction<Tuple2<Long,FrameBlock>,Row> + { + private static final long serialVersionUID = 8093340778966667460L; + + @Override + public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0) + throws Exception + { + FrameBlock blk = arg0._2(); + ArrayList<Row> ret = new ArrayList<Row>(); + + //handle Frame block data + Iterator<Object[]> iter = blk.getObjectRowIterator(); + while( iter.hasNext() ) + ret.add(RowFactory.create(iter.next().clone())); + + return ret; + } + } + ///////////////////////////////// // TEXTCELL-SPECIFIC FUNCTIONS @@ -808,4 +970,18 @@ public class FrameRDDConverterUtils return ret; } } + + ////////////////////////////////////// + // Common functions + + // Flushes current state of filled column blocks to output list. + private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret ) + throws DMLRuntimeException + { + int len = ix.length; + for( int i=0; i<len; i++ ) + if( mb[i] != null ) { + ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i])); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index d37bbde..e318fff 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -104,7 +104,7 @@ public abstract class FrameReader throws IOException, DMLRuntimeException { List<String> colNames = new ArrayList<String>(); - for (int i=0; i < clen; ++i) + for (int i=0; i < clen; i++) colNames.add("C"+i); return colNames; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java index dd0b9e0..6ad08d8 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java @@ -781,7 +781,7 @@ public class LibMatrixBincell if( ixPos1 >= 0 ){ //match, scan to next val if(bOp.fn instanceof LessThan || bOp.fn instanceof GreaterThanEquals || bOp.fn instanceof Equals || bOp.fn instanceof NotEquals) - while( ixPos1<bv.length && value==bv[ixPos1] ) ++ixPos1; + while( ixPos1<bv.length && value==bv[ixPos1] ) ixPos1++; if(bOp.fn instanceof GreaterThan || bOp.fn instanceof LessThanEquals || bOp.fn instanceof Equals || bOp.fn instanceof NotEquals) while( ixPos2 > 0 && value==bv[ixPos2-1]) --ixPos2; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java index dab24a0..25d3067 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java @@ -184,10 +184,10 @@ public class LibMatrixOuterAgg double dvix[] = new double[vix.length]; if (bPrimeCumSum) - for (int i = 0; i< vix.length; ++i) + for (int i = 0; i< vix.length; i++) dvix[vix.length-1-i] = vix[i]; else - for (int i = 0; i< vix.length; ++i) + for (int i = 0; i< vix.length; i++) dvix[i] = vix[i]; MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true); @@ -197,7 +197,7 @@ public class LibMatrixOuterAgg vixCumSum = DataConverter.convertToIntVector(mbResult); if (bPrimeCumSum) - for (int i = 0; i< (vixCumSum.length+1)/2; ++i) { + for (int i = 0; i< (vixCumSum.length+1)/2; i++) { int iTemp = vixCumSum[vixCumSum.length-1-i]; vixCumSum[vixCumSum.length-1-i] = vixCumSum[i]; vixCumSum[i] = iTemp; @@ -264,10 +264,10 @@ public class LibMatrixOuterAgg double dvix[] = new double[vix.length]; if (bPrimeCumSum) - for (int i = 0; i< vix.length; ++i) + for (int i = 0; i< vix.length; i++) dvix[vix.length-1-i] = vix[i]; else - for (int i = 0; i< vix.length; ++i) + for (int i = 0; i< vix.length; i++) dvix[i] = vix[i]; MatrixBlock mbix = DataConverter.convertToMatrixBlock(dvix, true); @@ -277,7 +277,7 @@ public class LibMatrixOuterAgg vixCumSum = DataConverter.convertToIntVector(mbResult); if (bPrimeCumSum) - for (int i = 0; i< (vixCumSum.length+1)/2; ++i) { + for (int i = 0; i< (vixCumSum.length+1)/2; i++) { int iTemp = vixCumSum[vixCumSum.length-1-i]; vixCumSum[vixCumSum.length-1-i] = vixCumSum[i]; vixCumSum[i] = iTemp; @@ -1030,7 +1030,7 @@ public class LibMatrixOuterAgg int[] aix = sblock.indexes(j); double [] avals = sblock.values(j); - for (int i=apos; i < apos+alen; ++i) { + for (int i=apos; i < apos+alen; i++) { int cnt = sumEqNe(avals[i], bv, bOp); out.quickSetValue(0, aix[i], cnt); } @@ -1447,7 +1447,7 @@ public class LibMatrixOuterAgg } else if(bOp.fn instanceof Equals) { double dFirstValue = vmb[0]; int i=0; - while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i; + while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++; if (i < vmb.length-1) vix[0] = i+1; else @@ -1455,7 +1455,7 @@ public class LibMatrixOuterAgg } else if(bOp.fn instanceof NotEquals) { double dFirstValue = vmb[0]; int i=0; - while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i; + while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++; if (i < vmb.length-1) vix[0] = i-1; else @@ -1520,10 +1520,10 @@ public class LibMatrixOuterAgg { int iCurInd = 0; - for (int i = 0; i < vix.length;++i) + for (int i = 0; i < vix.length;i++) { double dPrevVal = vmb[iCurInd]; - while(i<vix.length && dPrevVal == vmb[i]) ++i; + while(i<vix.length && dPrevVal == vmb[i]) i++; if(i < vix.length) { for(int j=iCurInd; j<i; ++j) vix[j] = vix[i]; @@ -1555,9 +1555,9 @@ public class LibMatrixOuterAgg int iLastIndex = 0; double dLastVal = vix[iLastIndex]; - for (int i = 0; i < vix.length-1; ++i) + for (int i = 0; i < vix.length-1; i++) { - while(i<vmb.length-1 && dLastVal == vmb[i+1]) ++i; + while(i<vmb.length-1 && dLastVal == vmb[i+1]) i++; for (int j=iLastIndex+1; j<=i; ++j) vix[j] = vix[iLastIndex]; if (i < vix.length-1) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 48b0b2d..842982d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -5069,7 +5069,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab // keep scanning the weights, until we hit the required position <code>fromPos</code> while ( count < fromPos ) { count += quickGetValue(index,1); - ++index; + index++; } double runningSum; @@ -5086,7 +5086,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab runningSum += (val * Math.min(wt, selectRange-selectedCount)); selectedCount += Math.min(wt, selectRange-selectedCount); count += wt; - ++index; + index++; } //System.out.println(fromPos + ", " + toPos + ": " + count + ", "+ runningSum + ", " + selectedCount); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java index 9c66518..c29ef5a 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java @@ -108,7 +108,7 @@ extends SequenceFileInputFormat<K,V> // take N samples from different parts of the input int totalcount = 0; - for(int i=0; i < samples; ++i) { + for(int i=0; i < samples; i++) { SequenceFileRecordReader reader = (SequenceFileRecordReader) inFormat.getRecordReader(splits[sampleStep * i], conf, null); int count=0; @@ -227,7 +227,7 @@ extends SequenceFileInputFormat<K,V> float stepSize = numRecords / (float) numPartitions; //System.out.println("Step size is " + stepSize); ArrayList<WritableComparable> result = new ArrayList<WritableComparable>(numPartitions-1); - for(int i=1; i < numPartitions; ++i) { + for(int i=1; i < numPartitions; i++) { result.add(records.get(Math.round(stepSize * i))); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index 08a6e8d..c790ae9 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -956,8 +956,8 @@ public class DataConverter } } } else { // Block is in dense format - for (int i=0; i<rowLength; ++i){ - for (int j=0; j<colLength; ++j){ + for (int i=0; i<rowLength; i++){ + for (int j=0; j<colLength; j++){ double value = mb.getValue(i, j); if (value != 0.0){ sb.append(i+1).append(separator).append(j+1).append(separator); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index fa17fcd..88221f2 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -23,6 +23,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.instructions.InstructionUtils; @@ -615,4 +618,45 @@ public class UtilFunctions return in1.getDataType(); } + + /* + * This function will convert Frame schema into DataFrame schema + * + * @param schema + * Frame schema in the form of List<ValueType> + * @return + * Returns the DataFrame schema (StructType) + */ + public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema) + { + // Generate the schema based on the string of schema + List<StructField> fields = new ArrayList<StructField>(); + + int i = 1; + for (ValueType schema : lschema) + { + org.apache.spark.sql.types.DataType dataType = DataTypes.StringType; + switch(schema) + { + case STRING: + dataType = DataTypes.StringType; + break; + case DOUBLE: + dataType = DataTypes.DoubleType; + break; + case INT: + dataType = DataTypes.LongType; + break; + case BOOLEAN: + dataType = DataTypes.BooleanType; + break; + default: + System.out.println("Default schema type is String."); + } + fields.add(DataTypes.createStructField("C"+i++, dataType, true)); + } + + return DataTypes.createStructType(fields); + } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java index 20c4a27..0d3b932 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java @@ -214,7 +214,7 @@ public class FrameAppendDistTest extends AutomatedTestBase } private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) { - for ( int i=0; i<frame1.getNumRows(); ++i ) + for ( int i=0; i<frame1.getNumRows(); i++ ) for( int j=0; j<frame1.getNumColumns(); j++ ) { Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j))); Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j))); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index 7a8392b..441a63b 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -27,9 +27,15 @@ import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.StructType; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.parser.Expression.ValueType; @@ -65,6 +71,7 @@ import org.junit.Test; + public class FrameConverterTest extends AutomatedTestBase { private final static String TEST_DIR = "functions/frame/"; @@ -100,8 +107,12 @@ public class FrameConverterTest extends AutomatedTestBase BIN2TXTCELL, MAT2BIN, BIN2MAT, + DFRM2BIN, + BIN2DFRM, } + private static String separator = ","; + @Override public void setUp() { TestUtils.clearAssertionInformation(); @@ -178,7 +189,16 @@ public class FrameConverterTest extends AutomatedTestBase runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT); } - + @Test + public void testFrameMixedDFrameBinSpark() { + runFrameConverterTest(schemaMixedLarge, ConvType.DFRM2BIN); + } + + @Test + public void testFrameMixedBinDFrameSpark() { + runFrameConverterTest(schemaMixedLarge, ConvType.BIN2DFRM); + } + /** * * @param schema @@ -204,7 +224,8 @@ public class FrameConverterTest extends AutomatedTestBase OutputInfo oinfo = null; InputInfo iinfo = null; switch( type ) { - case CSV2BIN: + case CSV2BIN: + case DFRM2BIN: oinfo = OutputInfo.CSVOutputInfo; iinfo = InputInfo.BinaryBlockInputInfo; break; @@ -221,6 +242,7 @@ public class FrameConverterTest extends AutomatedTestBase iinfo = InputInfo.TextCellInputInfo; break; case MAT2BIN: + case BIN2DFRM: oinfo = OutputInfo.BinaryBlockOutputInfo; iinfo = InputInfo.BinaryBlockInputInfo; break; @@ -389,7 +411,7 @@ public class FrameConverterTest extends AutomatedTestBase * @param frame2 */ private void verifyFrameData(FrameBlock frame1, FrameBlock frame2) { - for ( int i=0; i<frame1.getNumRows(); ++i ) + for ( int i=0; i<frame1.getNumRows(); i++ ) for( int j=0; j<frame1.getNumColumns(); j++ ) { String val1 = UtilFunctions.objectToString(frame1.get(i, j)); String val2 = UtilFunctions.objectToString(frame2.get(i, j)); @@ -444,7 +466,7 @@ public class FrameConverterTest extends AutomatedTestBase OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .csvToBinaryBlock(sc, rddIn, mc, false, ",", false, 0) + .csvToBinaryBlock(sc, rddIn, mc, false, separator, false, 0) .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); break; @@ -494,10 +516,69 @@ public class FrameConverterTest extends AutomatedTestBase rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass); break; } + case DFRM2BIN: { + OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; + + //Create DataFrame + SQLContext sqlContext = new SQLContext(sc); + StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema); + JavaRDD<Row> rowRDD = getRowRDD(sc, fnameIn, separator); + DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); + + JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils + .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/) + .mapToPair(new LongFrameToLongWritableFrameFunction()); + rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); + break; + } + case BIN2DFRM: { + InputInfo iinfo = InputInfo.BinaryBlockInputInfo; + OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; + JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); + JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction()); + DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc); + + //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary + JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils + .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/) + .mapToPair(new LongFrameToLongWritableFrameFunction()); + rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); + + break; + } default: throw new RuntimeException("Unsuported converter type: "+type.toString()); } sec.close(); } + + /* + * It will return JavaRDD<Row> based on csv data input file. + */ + JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String separator) + { + // Load a text file and convert each line to a java rdd. + JavaRDD<String> dataRdd = sc.textFile(fnameIn); + return dataRdd.map(new RowGenerator()); + } + + /* + * Row Generator class based on individual line in CSV file. + */ + private static class RowGenerator implements Function<String,Row> + { + private static final long serialVersionUID = -6736256507697511070L; + + @Override + public Row call(String record) throws Exception { + String[] fields = record.split(","); + Object[] objects = new Object[fields.length]; + for (int i=0; i<fields.length; i++) { + objects[i] = fields[i]; + } + return RowFactory.create(objects); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java index 9bc8d14..e713a86 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java @@ -172,7 +172,7 @@ public class FrameCopyTest extends AutomatedTestBase void verifyFrameData(FrameBlock frame1, FrameBlock frame2) { List<ValueType> lschema = frame1.getSchema(); - for ( int i=0; i<frame1.getNumRows(); ++i ) + for ( int i=0; i<frame1.getNumRows(); i++ ) for( int j=0; j<lschema.size(); j++ ) { if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0) Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i, j) + http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java index b43dc6b..86dee49 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java @@ -268,7 +268,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase } private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, ValueType[] schema) { - for ( int i=0; i<frame1.getNumRows(); ++i ) + for ( int i=0; i<frame1.getNumRows(); i++ ) for( int j=0; j<frame1.getNumColumns(); j++ ) { Object val1 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame1.get(i, j))); Object val2 = UtilFunctions.stringToObject(schema[j], UtilFunctions.objectToString(frame2.get(i, j))); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java index 7d45ebc..d46c11f 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java @@ -215,7 +215,7 @@ public class FrameReadWriteTest extends AutomatedTestBase void verifyFrameData(FrameBlock frame1, FrameBlock frame2) { List<ValueType> lschema = frame1.getSchema(); - for ( int i=0; i<frame1.getNumRows(); ++i ) + for ( int i=0; i<frame1.getNumRows(); i++ ) for( int j=0; j<lschema.size(); j++ ) { if( UtilFunctions.compareTo(lschema.get(j), frame1.get(i, j), frame2.get(i, j)) != 0) Assert.fail("Target value for cell ("+ i + "," + j + ") is " + frame1.get(i, j) +
