Repository: incubator-systemml
Updated Branches:
  refs/heads/master 821c5f50d -> 14e9f6443


[SYSTEMML-560] Frame DataFrame converters


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/14e9f644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/14e9f644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/14e9f644

Branch: refs/heads/master
Commit: 14e9f64439ce58acd3b3cdaf53e75a768a4757f0
Parents: 821c5f5
Author: Arvind Surve <[email protected]>
Authored: Fri Jul 22 22:34:27 2016 -0700
Committer: Arvind Surve <[email protected]>
Committed: Fri Jul 22 22:34:27 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 214 +++++++++++++++++--
 .../apache/sysml/runtime/io/FrameReader.java    |   2 +-
 .../runtime/matrix/data/LibMatrixBincell.java   |   2 +-
 .../runtime/matrix/data/LibMatrixOuterAgg.java  |  26 +--
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   4 +-
 .../matrix/sort/SamplingSortMRInputFormat.java  |   4 +-
 .../sysml/runtime/util/DataConverter.java       |   4 +-
 .../sysml/runtime/util/UtilFunctions.java       |  44 ++++
 .../functions/frame/FrameAppendDistTest.java    |   2 +-
 .../functions/frame/FrameConverterTest.java     |  89 +++++++-
 .../functions/frame/FrameCopyTest.java          |   2 +-
 .../functions/frame/FrameIndexingDistTest.java  |   2 +-
 .../functions/frame/FrameReadWriteTest.java     |   2 +-
 13 files changed, 349 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 052d3f3..c243eeb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -36,6 +36,13 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 import scala.Tuple2;
 
@@ -101,8 +108,7 @@ public class FrameRDDConverterUtils
                
                //convert csv rdd to binary block rdd (w/ partial blocks)
                JavaPairRDD<Long, FrameBlock> out = prepinput
-                               .mapPartitionsToPair(new 
CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill))
-                               .mapToPair(new 
LongWritableFrameToLongFrameFunction());
+                               .mapPartitionsToPair(new 
CSVToBinaryBlockFunction(mcOut, hasHeader, delim, fill));
                
                return out;
        }
@@ -300,8 +306,70 @@ public class FrameRDDConverterUtils
                return out;
        }
        
+       //=====================================
+       // DataFrame <--> Binary block
 
+       /**
+        * 
+        * @param sc
+        * @param input
+        * @param mcOut
+        * @param hasHeader
+        * @param delim
+        * @param fill
+        * @param missingValue
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<Long, FrameBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
+                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) 
+               throws DMLRuntimeException 
+       {
+               
+               if(containsID)
+                       df = df.drop("ID");
+               
+               //determine unknown dimensions if required
+               if( !mcOut.dimsKnown(true) ) {
+                       JavaRDD<Row> tmp = df.javaRDD();
+                       long rlen = tmp.count();
+                       long clen = containsID ? (df.columns().length - 1) : 
df.columns().length;
+                       mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), -1);
+               }
+               
+               JavaPairRDD<Row, Long> prepinput = df.javaRDD()
+                               .zipWithIndex(); //zip row index
+               
+               //convert rdd to binary block rdd
+               JavaPairRDD<Long, FrameBlock> out = prepinput
+                               .mapPartitionsToPair(new 
DataFrameToBinaryBlockFunction(mcOut));
+               
+               return out;
+       }
+
+       /**
+        * 
+        * @param in
+        * @param mcIn
+        * @param props
+        * @param strict
+        * @return
+        */
+       public static DataFrame 
binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics 
mcIn, JavaSparkContext sc)
+       {
+               List<ValueType> schema = in.first()._2().getSchema();
+               
+               //convert binary block to rows rdd (from blocks/rows)
+               JavaRDD<Row> rowRDD = in.flatMap(new 
BinaryBlockToDataFrameFunction());
+                               
+               SQLContext sqlContext = new SQLContext(sc);
+               StructType dfSchema = 
UtilFunctions.convertFrameSchemaToDFSchema(schema);
+               DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+       
+               return df;
+       }
        
+
        /////////////////////////////////
        // CSV-SPECIFIC FUNCTIONS
        
@@ -391,7 +459,7 @@ public class FrameRDDConverterUtils
         * In terms of memory consumption this is better than creating partial 
blocks of row segments.
         * 
         */
-       private static class CSVToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,LongWritable,FrameBlock> 
+       private static class CSVToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,Long,FrameBlock> 
        {
                private static final long serialVersionUID = 
-1976803898174960086L;
 
@@ -413,12 +481,12 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<LongWritable, FrameBlock>> 
call(Iterator<Tuple2<Text,Long>> arg0) 
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Text,Long>> arg0) 
                        throws Exception 
                {
-                       ArrayList<Tuple2<LongWritable,FrameBlock>> ret = new 
ArrayList<Tuple2<LongWritable,FrameBlock>>();
+                       ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
 
-                       LongWritable[] ix = new LongWritable[1];
+                       Long[] ix = new Long[1];
                        FrameBlock[] mb = new FrameBlock[1];
                        int iRowsInBlock = 0;
                        
@@ -467,10 +535,10 @@ public class FrameRDDConverterUtils
                }
                
                // Creates new state of empty column blocks for current global 
row index.
-               private void createBlocks(long rowix, LongWritable[] ix, 
FrameBlock[] mb)
+               private void createBlocks(long rowix, Long[] ix, FrameBlock[] 
mb)
                {
                        //compute row block index and number of column blocks
-                       ix[0] = new LongWritable(rowix);
+                       ix[0] = rowix;
                        mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
                        if( _colnames != null )
                                mb[0].setColumnNames(_colnames);
@@ -481,17 +549,6 @@ public class FrameRDDConverterUtils
                                for( int j=0; j<_clen; j++ )
                                        
mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
                }
-               
-               // Flushes current state of filled column blocks to output list.
-               private void flushBlocksToList( LongWritable[] ix, FrameBlock[] 
mb, ArrayList<Tuple2<LongWritable,FrameBlock>> ret ) 
-                       throws DMLRuntimeException
-               {
-                       int len = ix.length;                    
-                       for( int i=0; i<len; i++ )
-                               if( mb[i] != null ) {
-                                       ret.add(new 
Tuple2<LongWritable,FrameBlock>(ix[i],mb[i]));
-                               }       
-               }
        }
        
        /**
@@ -558,6 +615,111 @@ public class FrameRDDConverterUtils
                        return ret;
                }
        }
+       
+       /////////////////////////////////
+       // DataFrame-SPECIFIC FUNCTIONS
+       
+       private static class DataFrameToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,Long,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
2269315691094111843L;
+
+               private long _clen = -1;
+               private int _maxRowsPerBlock = -1;
+               
+               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) 
{
+                       _clen = mc.getCols();
+                       _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
+               }
+               
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+                       ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
+
+                       Long[] ix = new Long[1];
+                       FrameBlock[] mb = new FrameBlock[1];
+                       int iRowsInBlock = 0;
+                       
+                       while( arg0.hasNext() )
+                       {
+                               Tuple2<Row,Long> tmp = arg0.next();
+                               Row row = tmp._1();
+                               long rowix = tmp._2()+1;
+                               
+                               if( iRowsInBlock == 0 || iRowsInBlock == 
_maxRowsPerBlock) {
+                                       if( iRowsInBlock == _maxRowsPerBlock )
+                                               flushBlocksToList(ix, mb, ret);
+                                       createBlocks(rowix, ix, mb, row);
+                                       iRowsInBlock = 0;
+                               }
+                               
+                               //process row data
+                               Object[] parts = rowToObjectArray(row, 
(int)_clen, mb[0].getSchema());
+                               mb[0].appendRow(parts);
+                               iRowsInBlock++;
+                       }
+               
+                       //flush last blocks
+                       flushBlocksToList(ix, mb, ret);
+               
+                       return ret;
+               }
+               
+               public Object[] rowToObjectArray(Row row, int _clen, 
List<ValueType> schema) throws Exception {
+                       Object[] ret = new Object[_clen];
+                       for(int i = 0; i < row.length(); i++)
+                               ret[i] = 
UtilFunctions.objectToObject(schema.get(i), row.get(i));
+                       for(int i=row.length(); i<_clen; i++)
+                               ret[i] = "";
+                       return ret;
+               }
+
+               // Creates new state of empty column blocks for current global 
row index.
+               private void createBlocks(long rowix, Long[] ix, FrameBlock[] 
mb, Row row)
+               {
+                       //compute row block index and number of column blocks
+                       ix[0] = new Long(rowix);
+                       
+                       List<String> columns = new ArrayList<String>();
+                       List<ValueType> schema = new ArrayList<ValueType>();
+                       for (StructField structType: row.schema().fields()) {
+                               columns.add(structType.name());
+                               if(structType.dataType() == 
DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
+                                       schema.add(ValueType.DOUBLE);
+                               else if(structType.dataType() == 
DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
+                                       schema.add(ValueType.INT);
+                               else if(structType.dataType() == 
DataTypes.BooleanType)
+                                       schema.add(ValueType.BOOLEAN);
+                               else
+                                       schema.add(ValueType.STRING);
+                       }
+                       mb[0] = new FrameBlock(schema);
+                       mb[0].setColumnNames(columns);
+               }
+       }
+
+       /**
+        * 
+        */
+       private static class BinaryBlockToDataFrameFunction implements 
FlatMapFunction<Tuple2<Long,FrameBlock>,Row> 
+       {
+               private static final long serialVersionUID = 
8093340778966667460L;
+               
+               @Override
+               public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
+                       throws Exception 
+               {
+                       FrameBlock blk = arg0._2();
+                       ArrayList<Row> ret = new ArrayList<Row>();
+
+                       //handle Frame block data
+                       Iterator<Object[]> iter = blk.getObjectRowIterator();
+                       while( iter.hasNext() )
+                               ret.add(RowFactory.create(iter.next().clone()));
+                               
+                       return ret;
+               }
+       }
+       
        /////////////////////////////////
        // TEXTCELL-SPECIFIC FUNCTIONS
        
@@ -808,4 +970,18 @@ public class FrameRDDConverterUtils
                        return ret;
                }
        }
+       
+       //////////////////////////////////////
+       // Common functions
+       
+       // Flushes current state of filled column blocks to output list.
+       private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+               throws DMLRuntimeException
+       {
+               int len = ix.length;                    
+               for( int i=0; i<len; i++ )
+                       if( mb[i] != null ) {
+                               ret.add(new 
Tuple2<Long,FrameBlock>(ix[i],mb[i]));
+                       }       
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index d37bbde..e318fff 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -104,7 +104,7 @@ public abstract class FrameReader
                throws IOException, DMLRuntimeException
        {
                List<String> colNames = new ArrayList<String>();
-               for (int i=0; i < clen; ++i)
+               for (int i=0; i < clen; i++)
                        colNames.add("C"+i);
                return colNames;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
index dd0b9e0..6ad08d8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixBincell.java
@@ -781,7 +781,7 @@ public class LibMatrixBincell
                        if( ixPos1 >= 0 ){ //match, scan to next val
                                if(bOp.fn instanceof LessThan || bOp.fn 
instanceof GreaterThanEquals 
                                                || bOp.fn instanceof Equals || 
bOp.fn instanceof NotEquals)
-                                       while( ixPos1<bv.length && 
value==bv[ixPos1]  ) ++ixPos1;
+                                       while( ixPos1<bv.length && 
value==bv[ixPos1]  ) ixPos1++;
                                if(bOp.fn instanceof GreaterThan || bOp.fn 
instanceof LessThanEquals 
                                                || bOp.fn instanceof Equals || 
bOp.fn instanceof NotEquals)
                                        while(  ixPos2 > 0 && 
value==bv[ixPos2-1]) --ixPos2;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
index dab24a0..25d3067 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixOuterAgg.java
@@ -184,10 +184,10 @@ public class LibMatrixOuterAgg
                        
                        double dvix[] = new double[vix.length];
                        if (bPrimeCumSum)
-                               for (int i = 0; i< vix.length; ++i)
+                               for (int i = 0; i< vix.length; i++)
                                        dvix[vix.length-1-i] = vix[i];
                        else
-                               for (int i = 0; i< vix.length; ++i)
+                               for (int i = 0; i< vix.length; i++)
                                        dvix[i] = vix[i];
                        
                        MatrixBlock mbix = 
DataConverter.convertToMatrixBlock(dvix, true);
@@ -197,7 +197,7 @@ public class LibMatrixOuterAgg
                        
                        vixCumSum = DataConverter.convertToIntVector(mbResult); 
 
                        if (bPrimeCumSum)
-                               for (int i = 0; i< (vixCumSum.length+1)/2; ++i) 
{
+                               for (int i = 0; i< (vixCumSum.length+1)/2; i++) 
{
                                        int iTemp = 
vixCumSum[vixCumSum.length-1-i];
                                        vixCumSum[vixCumSum.length-1-i] = 
vixCumSum[i];
                                        vixCumSum[i] = iTemp;
@@ -264,10 +264,10 @@ public class LibMatrixOuterAgg
                        
                        double dvix[] = new double[vix.length];
                        if (bPrimeCumSum)
-                               for (int i = 0; i< vix.length; ++i)
+                               for (int i = 0; i< vix.length; i++)
                                        dvix[vix.length-1-i] = vix[i];
                        else
-                               for (int i = 0; i< vix.length; ++i)
+                               for (int i = 0; i< vix.length; i++)
                                        dvix[i] = vix[i];
                        
                        MatrixBlock mbix = 
DataConverter.convertToMatrixBlock(dvix, true);
@@ -277,7 +277,7 @@ public class LibMatrixOuterAgg
                        
                        vixCumSum = DataConverter.convertToIntVector(mbResult); 
 
                        if (bPrimeCumSum)
-                               for (int i = 0; i< (vixCumSum.length+1)/2; ++i) 
{
+                               for (int i = 0; i< (vixCumSum.length+1)/2; i++) 
{
                                        int iTemp = 
vixCumSum[vixCumSum.length-1-i];
                                        vixCumSum[vixCumSum.length-1-i] = 
vixCumSum[i];
                                        vixCumSum[i] = iTemp;
@@ -1030,7 +1030,7 @@ public class LibMatrixOuterAgg
                        int[] aix = sblock.indexes(j);
                        double [] avals = sblock.values(j);
                        
-                       for (int i=apos; i < apos+alen; ++i) {
+                       for (int i=apos; i < apos+alen; i++) {
                                int cnt = sumEqNe(avals[i], bv, bOp);
                                out.quickSetValue(0, aix[i], cnt);
                        }
@@ -1447,7 +1447,7 @@ public class LibMatrixOuterAgg
        } else if(bOp.fn instanceof Equals) {
                double dFirstValue = vmb[0];
                int i=0;
-               while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+               while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
                if (i < vmb.length-1) 
                        vix[0] = i+1;
                else    
@@ -1455,7 +1455,7 @@ public class LibMatrixOuterAgg
        } else if(bOp.fn instanceof NotEquals) {
                double dFirstValue = vmb[0];
                int i=0;
-               while(i<vmb.length-1 && dFirstValue == vmb[i+1]) ++i;
+               while(i<vmb.length-1 && dFirstValue == vmb[i+1]) i++;
                if (i < vmb.length-1) 
                        vix[0] = i-1;
                else    
@@ -1520,10 +1520,10 @@ public class LibMatrixOuterAgg
        {
        int iCurInd = 0;
                
-       for (int i = 0; i < vix.length;++i)
+       for (int i = 0; i < vix.length;i++)
        {
                double dPrevVal = vmb[iCurInd];
-                       while(i<vix.length && dPrevVal == vmb[i]) ++i;
+                       while(i<vix.length && dPrevVal == vmb[i]) i++;
                        
                        if(i < vix.length) {
                                for(int j=iCurInd; j<i; ++j) vix[j] = vix[i];
@@ -1555,9 +1555,9 @@ public class LibMatrixOuterAgg
                int iLastIndex = 0;
                double dLastVal = vix[iLastIndex];
 
-       for (int i = 0; i < vix.length-1; ++i)
+       for (int i = 0; i < vix.length-1; i++)
        {
-               while(i<vmb.length-1 && dLastVal == vmb[i+1]) ++i;
+               while(i<vmb.length-1 && dLastVal == vmb[i+1]) i++;
                for (int j=iLastIndex+1; j<=i; ++j) 
                        vix[j] = vix[iLastIndex];
                if (i < vix.length-1) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 48b0b2d..842982d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5069,7 +5069,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                // keep scanning the weights, until we hit the required 
position <code>fromPos</code>
                while ( count < fromPos ) {
                        count += quickGetValue(index,1);
-                       ++index;
+                       index++;
                }
                
                double runningSum; 
@@ -5086,7 +5086,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        runningSum += (val * Math.min(wt, 
selectRange-selectedCount));
                        selectedCount += Math.min(wt, 
selectRange-selectedCount);
                        count += wt;
-                       ++index;
+                       index++;
                }
                
                //System.out.println(fromPos + ", " + toPos + ": " + count + ", 
"+ runningSum + ", " + selectedCount);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
index 9c66518..c29ef5a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java
@@ -108,7 +108,7 @@ extends SequenceFileInputFormat<K,V>
            // take N samples from different parts of the input
            
            int totalcount = 0;
-           for(int i=0; i < samples; ++i) {
+           for(int i=0; i < samples; i++) {
                SequenceFileRecordReader reader = 
                        (SequenceFileRecordReader) 
inFormat.getRecordReader(splits[sampleStep * i], conf, null);
                int count=0;
@@ -227,7 +227,7 @@ extends SequenceFileInputFormat<K,V>
                        float stepSize = numRecords / (float) numPartitions;
                        //System.out.println("Step size is " + stepSize);
                        ArrayList<WritableComparable> result = new 
ArrayList<WritableComparable>(numPartitions-1);
-                       for(int i=1; i < numPartitions; ++i) {
+                       for(int i=1; i < numPartitions; i++) {
                                result.add(records.get(Math.round(stepSize * 
i)));
                        }
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index 08a6e8d..c790ae9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -956,8 +956,8 @@ public class DataConverter
                                        }
                                }
                        } else {        // Block is in dense format
-                               for (int i=0; i<rowLength; ++i){
-                                       for (int j=0; j<colLength; ++j){
+                               for (int i=0; i<rowLength; i++){
+                                       for (int j=0; j<colLength; j++){
                                                double value = mb.getValue(i, 
j);
                                                if (value != 0.0){
                                                        
sb.append(i+1).append(separator).append(j+1).append(separator);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index fa17fcd..88221f2 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,6 +23,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -615,4 +618,45 @@ public class UtilFunctions
        
                return in1.getDataType();
        }
+       
+       /*
+        * This function will convert Frame schema into DataFrame schema 
+        * 
+        *  @param      schema
+        *              Frame schema in the form of List<ValueType>
+        *  @return
+        *              Returns the DataFrame schema (StructType)
+        */
+       public static StructType convertFrameSchemaToDFSchema(List<ValueType> 
lschema)
+       {
+               // Generate the schema based on the string of schema
+               List<StructField> fields = new ArrayList<StructField>();
+               
+               int i = 1;
+               for (ValueType schema : lschema)
+               {
+                       org.apache.spark.sql.types.DataType dataType = 
DataTypes.StringType;
+                       switch(schema)
+                       {
+                               case STRING:
+                                       dataType = DataTypes.StringType;
+                                       break;
+                               case DOUBLE:
+                                       dataType = DataTypes.DoubleType;
+                                       break;
+                               case INT:
+                                       dataType = DataTypes.LongType;
+                                       break;
+                               case BOOLEAN:
+                                       dataType = DataTypes.BooleanType;
+                                       break;
+                               default:
+                                       System.out.println("Default schema type 
is String.");
+                       }
+                       fields.add(DataTypes.createStructField("C"+i++, 
dataType, true));
+               }
+               
+               return DataTypes.createStructType(fields);
+       }
+       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
index 20c4a27..0d3b932 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameAppendDistTest.java
@@ -214,7 +214,7 @@ public class FrameAppendDistTest extends AutomatedTestBase
        }
        
        private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, 
ValueType[] schema) {
-               for ( int i=0; i<frame1.getNumRows(); ++i )
+               for ( int i=0; i<frame1.getNumRows(); i++ )
                        for( int j=0; j<frame1.getNumColumns(); j++ )   {
                                Object val1 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame1.get(i, j)));
                                Object val2 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame2.get(i, j)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 7a8392b..441a63b 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -27,9 +27,15 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -65,6 +71,7 @@ import org.junit.Test;
 
 
 
+
 public class FrameConverterTest extends AutomatedTestBase
 {
        private final static String TEST_DIR = "functions/frame/";
@@ -100,8 +107,12 @@ public class FrameConverterTest extends AutomatedTestBase
                BIN2TXTCELL,
                MAT2BIN,
                BIN2MAT,
+               DFRM2BIN,
+               BIN2DFRM,
        }
        
+       private static String separator = ",";
+       
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
@@ -178,7 +189,16 @@ public class FrameConverterTest extends AutomatedTestBase
                runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT);
        }
        
-       
+       @Test
+       public void testFrameMixedDFrameBinSpark()  {
+               runFrameConverterTest(schemaMixedLarge, ConvType.DFRM2BIN);
+       }
+               
+       @Test
+       public void testFrameMixedBinDFrameSpark()  {
+               runFrameConverterTest(schemaMixedLarge, ConvType.BIN2DFRM);
+       }
+               
        /**
         * 
         * @param schema
@@ -204,7 +224,8 @@ public class FrameConverterTest extends AutomatedTestBase
                        OutputInfo oinfo = null;
                        InputInfo iinfo = null;
                        switch( type ) {
-                               case CSV2BIN: 
+                               case CSV2BIN:
+                               case DFRM2BIN:
                                        oinfo = OutputInfo.CSVOutputInfo;
                                        iinfo = InputInfo.BinaryBlockInputInfo;
                                        break;
@@ -221,6 +242,7 @@ public class FrameConverterTest extends AutomatedTestBase
                                        iinfo = InputInfo.TextCellInputInfo;
                                        break;
                                case MAT2BIN: 
+                               case BIN2DFRM:
                                        oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                        iinfo = InputInfo.BinaryBlockInputInfo;
                                        break;
@@ -389,7 +411,7 @@ public class FrameConverterTest extends AutomatedTestBase
         * @param frame2
         */
        private void verifyFrameData(FrameBlock frame1, FrameBlock frame2) {
-               for ( int i=0; i<frame1.getNumRows(); ++i )
+               for ( int i=0; i<frame1.getNumRows(); i++ )
                        for( int j=0; j<frame1.getNumColumns(); j++ )   {
                                String val1 = 
UtilFunctions.objectToString(frame1.get(i, j));
                                String val2 = 
UtilFunctions.objectToString(frame2.get(i, j));                           
@@ -444,7 +466,7 @@ public class FrameConverterTest extends AutomatedTestBase
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                JavaPairRDD<LongWritable,Text> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .csvToBinaryBlock(sc, rddIn, 
mc, false, ",", false, 0)
+                                               .csvToBinaryBlock(sc, rddIn, 
mc, false, separator, false, 0)
                                                .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
                                break;
@@ -494,10 +516,69 @@ public class FrameConverterTest extends AutomatedTestBase
                                rddOut.saveAsHadoopFile(fnameOut, 
MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
                                break;
                        }
+                       case DFRM2BIN: {
+                               OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+
+                               //Create DataFrame 
+                               SQLContext sqlContext = new SQLContext(sc);
+                               StructType dfSchema = 
UtilFunctions.convertFrameSchemaToDFSchema(schema);
+                               JavaRDD<Row> rowRDD = getRowRDD(sc, fnameIn, 
separator);
+                               DataFrame df = 
sqlContext.createDataFrame(rowRDD, dfSchema);
+                               
+                               JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
+                                               .dataFrameToBinaryBlock(sc, df, 
mc, false/*, columns*/)
+                                               .mapToPair(new 
LongFrameToLongWritableFrameFunction());
+                               rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+                               break;
+                       }
+                       case BIN2DFRM: {
+                               InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
+                               OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                               JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
+                               JavaPairRDD<Long, FrameBlock> rddIn2 = 
rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
+                               DataFrame df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+                               
+                               //Convert back DataFrame to binary block for 
comparison using original binary to converted DF and back to binary 
+                               JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
+                                               .dataFrameToBinaryBlock(sc, df, 
mc, false/*, columns*/)
+                                               .mapToPair(new 
LongFrameToLongWritableFrameFunction());
+                               rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+                       
+                               break;
+                       }
                        default: 
                                throw new RuntimeException("Unsuported 
converter type: "+type.toString());
                }
                
                sec.close();
        }
+       
+       /* 
+        * It will return JavaRDD<Row> based on csv data input file.
+        */
+       JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String 
separator)
+       {
+               // Load a text file and convert each line to a java rdd.
+               JavaRDD<String> dataRdd = sc.textFile(fnameIn);
+               return dataRdd.map(new RowGenerator());
+       }
+       
+       /* 
+        * Row Generator class based on individual line in CSV file.
+        */
+       private static class RowGenerator implements Function<String,Row> 
+       {
+               private static final long serialVersionUID = 
-6736256507697511070L;
+
+               @Override
+               public Row call(String record) throws Exception {
+                     String[] fields = record.split(",");
+                     Object[] objects = new Object[fields.length]; 
+                     for (int i=0; i<fields.length; i++) {
+                             objects[i] = fields[i];
+                     }
+                     return RowFactory.create(objects);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
index 9bc8d14..e713a86 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameCopyTest.java
@@ -172,7 +172,7 @@ public class FrameCopyTest extends AutomatedTestBase
        void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
        {
                List<ValueType> lschema = frame1.getSchema();
-               for ( int i=0; i<frame1.getNumRows(); ++i )
+               for ( int i=0; i<frame1.getNumRows(); i++ )
                        for( int j=0; j<lschema.size(); j++ )   {
                                if( UtilFunctions.compareTo(lschema.get(j), 
frame1.get(i, j), frame2.get(i, j)) != 0)
                                        Assert.fail("Target value for cell ("+ 
i + "," + j + ") is " + frame1.get(i,  j) + 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
index b43dc6b..86dee49 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameIndexingDistTest.java
@@ -268,7 +268,7 @@ public class FrameIndexingDistTest extends AutomatedTestBase
        }
        
        private void verifyFrameData(FrameBlock frame1, FrameBlock frame2, 
ValueType[] schema) {
-               for ( int i=0; i<frame1.getNumRows(); ++i )
+               for ( int i=0; i<frame1.getNumRows(); i++ )
                        for( int j=0; j<frame1.getNumColumns(); j++ )   {
                                Object val1 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame1.get(i, j)));
                                Object val2 = 
UtilFunctions.stringToObject(schema[j], 
UtilFunctions.objectToString(frame2.get(i, j)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/14e9f644/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
index 7d45ebc..d46c11f 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
@@ -215,7 +215,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
        void verifyFrameData(FrameBlock frame1, FrameBlock frame2)
        {
                List<ValueType> lschema = frame1.getSchema();
-               for ( int i=0; i<frame1.getNumRows(); ++i )
+               for ( int i=0; i<frame1.getNumRows(); i++ )
                        for( int j=0; j<lschema.size(); j++ )   {
                                if( UtilFunctions.compareTo(lschema.get(j), 
frame1.get(i, j), frame2.get(i, j)) != 0)
                                        Assert.fail("Target value for cell ("+ 
i + "," + j + ") is " + frame1.get(i,  j) + 

Reply via email to