[SYSTEMML-914][SYSTEMML-919] Rework dataframe-matrix converters, tests

The existing dataframe-matrix and matrix-dataframe converters had a
variety of issues. This patch completely overhauls these converters and
moves them into the well-tested RDDConverterUtils. In detail, this
includes the following fixes and improvements:

(1) Consolidation of redundant dataframe converters in
RDDConverterUtilsExt, MLContextConversionUtil, MLOutput.

(2) Missing block sizes and update of matrix characteristics after
dataframe conversion.

(3) Wrong matrix characteristics (clen) in case of row datasets with
contained ID column and unknown dimensions. This is critical as it leads
to incorrect results on writing and other operations.

(4) Various performance features: (a) No sorting by rowID but reuse
which also avoids zipwithindex, (b) significantly less object creation
and redundant data copies, (c) row shuffle as dense/sparse blocks
instead of bloated object arrays, (d) no double parsing via exceptions
per cell value, (e) dedicated handling of sparse/dense blocks, and (f)
avoided unnecessary double parsing on nnz analysis.

(5) General code quality (e.g., collapsed bloated code for simple
conjunctive predicates, removed commented experimental code, etc).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/085009a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/085009a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/085009a3

Branch: refs/heads/master
Commit: 085009a367519d437558b2004ac93d8b6ea60195
Parents: 00b06d4
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Thu Sep 15 09:40:46 2016 +0200
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Thu Sep 15 09:41:34 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    |   5 +-
 .../java/org/apache/sysml/api/MLOutput.java     | 277 +---------
 .../functions/ConvertSingleColumnToString.java  |  34 --
 .../api/ml/functions/ConvertVectorToDouble.java |  35 --
 .../api/mlcontext/MLContextConversionUtil.java  | 193 ++-----
 .../sysml/api/mlcontext/MLContextUtil.java      |  31 +-
 .../instructions/spark/WriteSPInstruction.java  |   3 +-
 .../spark/utils/RDDConverterUtils.java          | 313 +++++++++++
 .../spark/utils/RDDConverterUtilsExt.java       | 523 +------------------
 .../runtime/matrix/MatrixCharacteristics.java   |  10 +-
 .../sysml/runtime/util/DataConverter.java       |  27 +
 .../sysml/runtime/util/UtilFunctions.java       |  28 +-
 .../mlcontext/DataFrameConversionTest.java      | 196 +++++++
 .../functions/mlcontext/GNMFTest.java           |   3 +-
 14 files changed, 643 insertions(+), 1035 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index c5588e2..1e415f1 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -76,7 +76,7 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFuncti
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -294,7 +294,8 @@ public class MLContext {
        public void registerInput(String varName, DataFrame df, boolean 
containsID) throws DMLRuntimeException {
                int blksz = ConfigurationManager.getBlocksize();
                MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, 
blksz, blksz);
-               JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = 
RDDConverterUtilsExt.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, 
mcOut, containsID);
+               JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtils
+                               .dataFrameToBinaryBlock(new 
JavaSparkContext(_sc), df, mcOut, containsID, false);
                registerInput(varName, rdd, mcOut);
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index f41c479..ac1b3f3 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -19,39 +19,24 @@
 
 package org.apache.sysml.api;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.mllib.linalg.DenseVector;
-import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.functions.GetMLBlock;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.util.UtilFunctions;
-
-import scala.Tuple2;
 
 /**
  * This is a simple container object that returns the output of execute from 
MLContext 
@@ -114,7 +99,7 @@ public class MLOutput {
                JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = 
getBinaryBlockedRDD(varName);
                if(rdd != null) {
                        MatrixCharacteristics mc = _outMetadata.get(varName);
-                       return RDDConverterUtilsExt.binaryBlockToDataFrame(rdd, 
mc, sqlContext);
+                       return 
RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, false);
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }
@@ -135,7 +120,7 @@ public class MLOutput {
                        JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = 
getBinaryBlockedRDD(varName);
                        if(rdd != null) {
                                MatrixCharacteristics mc = 
_outMetadata.get(varName);
-                               return 
RDDConverterUtilsExt.binaryBlockToVectorDataFrame(rdd, mc, sqlContext);
+                               return 
RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, true);
                        }
                        throw new DMLRuntimeException("Variable " + varName + " 
not found in the output symbol table.");
                }
@@ -153,62 +138,25 @@ public class MLOutput {
         * @return
         * @throws DMLRuntimeException
         */
-       public DataFrame getDF(SQLContext sqlContext, String varName, 
Map<String, Tuple2<Long, Long>> range) throws DMLRuntimeException {
-               if(sqlContext == null) {
+       public DataFrame getDF(SQLContext sqlContext, String varName, 
MatrixCharacteristics mc) 
+               throws DMLRuntimeException 
+       {
+               if(sqlContext == null)
                        throw new DMLRuntimeException("SQLContext is not 
created.");
-               }
+                       
                JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = 
getBinaryBlockedRDD(varName);
-               if(binaryBlockRDD == null) {
-                       throw new DMLRuntimeException("Variable " + varName + " 
not found in the output symbol table.");
-               }
-               MatrixCharacteristics mc = _outMetadata.get(varName);
-               long rlen = mc.getRows(); long clen = mc.getCols();
-               int brlen = mc.getRowsPerBlock(); int bclen = 
mc.getColsPerBlock();
-               
-               ArrayList<Tuple2<String, Tuple2<Long, Long>>> alRange = new 
ArrayList<Tuple2<String, Tuple2<Long, Long>>>();
-               for(Entry<String, Tuple2<Long, Long>> e : range.entrySet()) {
-                       alRange.add(new Tuple2<String, 
Tuple2<Long,Long>>(e.getKey(), e.getValue()));
-               }
-               
-               // Very expensive operation here: groupByKey (where number of 
keys might be too large)
-               JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new 
ProjectRows(rlen, clen, brlen, bclen))
-                               .groupByKey().map(new 
ConvertDoubleArrayToRangeRows(clen, bclen, alRange));
-
-               int numColumns = (int) clen;
-               if(numColumns <= 0) {
-                       throw new DMLRuntimeException("Output dimensions 
unknown after executing the script and hence cannot create the dataframe");
-               }
-               
-               List<StructField> fields = new ArrayList<StructField>();
-               // LongTypes throw an error: java.lang.Double incompatible with 
java.lang.Long
-               fields.add(DataTypes.createStructField("__INDEX", 
DataTypes.DoubleType, false));
-               for(int k = 0; k < alRange.size(); k++) {
-                       String colName = alRange.get(k)._1;
-                       long low = alRange.get(k)._2._1;
-                       long high = alRange.get(k)._2._2;
-                       if(low != high)
-                               fields.add(DataTypes.createStructField(colName, 
new VectorUDT(), false));
-                       else
-                               fields.add(DataTypes.createStructField(colName, 
DataTypes.DoubleType, false));
-               }
-               
-               // This will cause infinite recursion due to bug in Spark
-               // https://issues.apache.org/jira/browse/SPARK-6999
-               // return sqlContext.createDataFrame(rowsRDD, colNames); // 
where ArrayList<String> colNames
-               return sqlContext.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
-               
+               return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, 
binaryBlockRDD, mc, true);
        }
        
        public JavaRDD<String> getStringRDD(String varName, String format) 
throws DMLRuntimeException {
                if(format.equals("text")) {
                        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryRDD = 
getBinaryBlockedRDD(varName);
                        MatrixCharacteristics mcIn = 
getMatrixCharacteristics(varName); 
-                       return 
RDDConverterUtilsExt.binaryBlockToStringRDD(binaryRDD, mcIn, format);
+                       return 
RDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcIn);
                }
                else {
                        throw new DMLRuntimeException("The output format:" + 
format + " is not implemented yet.");
                }
-               
        }
        
        public JavaRDD<String> getStringFrameRDD(String varName, String format, 
CSVFileFormatProperties fprop ) throws DMLRuntimeException {
@@ -247,209 +195,4 @@ public class MLOutput {
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }
-       
-//     /**
-//      * Experimental: Please use this with caution as it will fail in many 
corner cases.
-//      * @return org.apache.spark.mllib.linalg.distributed.BlockMatrix
-//      * @throws DMLRuntimeException 
-//      */
-//     public BlockMatrix getMLLibBlockedMatrix(MLContext ml, SQLContext 
sqlContext, String varName) throws DMLRuntimeException {
-//             return getMLMatrix(ml, sqlContext, varName).toBlockedMatrix();
-//     }
-       
-       public static class ProjectRows implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, Long, Tuple2<Long, 
Double[]>> {
-               private static final long serialVersionUID = 
-4792573268900472749L;
-               long rlen; long clen;
-               int brlen; int bclen;
-               public ProjectRows(long rlen, long clen, int brlen, int bclen) {
-                       this.rlen = rlen;
-                       this.clen = clen;
-                       this.brlen = brlen;
-                       this.bclen = bclen;
-               }
-
-               @Override
-               public Iterable<Tuple2<Long, Tuple2<Long, Double[]>>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
-                       // 
------------------------------------------------------------------
-               //      Compute local block size: 
-               // Example: For matrix: 1500 X 1100 with block length 1000 X 
1000
-               // We will have four local block sizes (1000X1000, 1000X100, 
500X1000 and 500X1000)
-               long blockRowIndex = kv._1.getRowIndex();
-               long blockColIndex = kv._1.getColumnIndex();
-               int lrlen = UtilFunctions.computeBlockSize(rlen, blockRowIndex, 
brlen);
-               int lclen = UtilFunctions.computeBlockSize(clen, blockColIndex, 
bclen);
-               // 
------------------------------------------------------------------
-                       
-                       long startRowIndex = (kv._1.getRowIndex()-1) * bclen + 
1;
-                       MatrixBlock blk = kv._2;
-                       ArrayList<Tuple2<Long, Tuple2<Long, Double[]>>> retVal 
= new ArrayList<Tuple2<Long,Tuple2<Long,Double[]>>>();
-                       for(int i = 0; i < lrlen; i++) {
-                               Double[] partialRow = new Double[lclen];
-                               for(int j = 0; j < lclen; j++) {
-                                       partialRow[j] = blk.getValue(i, j);
-                               }
-                               retVal.add(new Tuple2<Long, 
Tuple2<Long,Double[]>>(startRowIndex + i, new 
Tuple2<Long,Double[]>(kv._1.getColumnIndex(), partialRow)));
-                       }
-                       return retVal;
-               }
-       }
-       
-       public static class ConvertDoubleArrayToRows implements 
Function<Tuple2<Long, Iterable<Tuple2<Long, Double[]>>>, Row> {
-               private static final long serialVersionUID = 
4441184411670316972L;
-               
-               int bclen; long clen;
-               boolean outputVector;
-               public ConvertDoubleArrayToRows(long clen, int bclen, boolean 
outputVector) {
-                       this.bclen = bclen;
-                       this.clen = clen;
-                       this.outputVector = outputVector;
-               }
-
-               @Override
-               public Row call(Tuple2<Long, Iterable<Tuple2<Long, Double[]>>> 
arg0)
-                               throws Exception {
-                       
-                       HashMap<Long, Double[]> partialRows = new HashMap<Long, 
Double[]>();
-                       int sizeOfPartialRows = 0;
-                       for(Tuple2<Long, Double[]> kv : arg0._2) {
-                               partialRows.put(kv._1, kv._2);
-                               sizeOfPartialRows += kv._2.length;
-                       }
-                       
-                       // Insert first row as row index
-                       Object[] row = null;
-                       if(outputVector) {
-                               row = new Object[2];
-                               double [] vecVals = new 
double[sizeOfPartialRows];
-                               
-                               for(long columnBlockIndex = 1; columnBlockIndex 
<= partialRows.size(); columnBlockIndex++) {
-                                       
if(partialRows.containsKey(columnBlockIndex)) {
-                                               Double [] array = 
partialRows.get(columnBlockIndex);
-                                               // 
------------------------------------------------------------------
-                                               //      Compute local block 
size: 
-                                               int lclen = 
UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-                                               // 
------------------------------------------------------------------
-                                               if(array.length != lclen) {
-                                                       throw new 
Exception("Incorrect double array provided by ProjectRows");
-                                               }
-                                               for(int i = 0; i < lclen; i++) {
-                                                       vecVals[(int) 
((columnBlockIndex-1)*bclen + i)] = array[i];
-                                               }
-                                       }
-                                       else {
-                                               throw new Exception("The block 
for column index " + columnBlockIndex + " is missing. Make sure the last 
instruction is not returning empty blocks");
-                                       }
-                               }
-                               
-                               long rowIndex = arg0._1;
-                               row[0] = (double) rowIndex;
-                               row[1] = new DenseVector(vecVals); // 
breeze.util.JavaArrayOps.arrayDToDv(vecVals);
-                       }
-                       else {
-                               row = new Double[sizeOfPartialRows + 1];
-                               long rowIndex = arg0._1;
-                               row[0] = (double) rowIndex;
-                               for(long columnBlockIndex = 1; columnBlockIndex 
<= partialRows.size(); columnBlockIndex++) {
-                                       
if(partialRows.containsKey(columnBlockIndex)) {
-                                               Double [] array = 
partialRows.get(columnBlockIndex);
-                                               // 
------------------------------------------------------------------
-                                               //      Compute local block 
size: 
-                                               int lclen = 
UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-                                               // 
------------------------------------------------------------------
-                                               if(array.length != lclen) {
-                                                       throw new 
Exception("Incorrect double array provided by ProjectRows");
-                                               }
-                                               for(int i = 0; i < lclen; i++) {
-                                                       row[(int) 
((columnBlockIndex-1)*bclen + i) + 1] = array[i];
-                                               }
-                                       }
-                                       else {
-                                               throw new Exception("The block 
for column index " + columnBlockIndex + " is missing. Make sure the last 
instruction is not returning empty blocks");
-                                       }
-                               }
-                       }
-                       Object[] row_fields = row;
-                       return RowFactory.create(row_fields);
-               }
-       }
-       
-       
-       public static class ConvertDoubleArrayToRangeRows implements 
Function<Tuple2<Long, Iterable<Tuple2<Long, Double[]>>>, Row> {
-               private static final long serialVersionUID = 
4441184411670316972L;
-               
-               int bclen; long clen;
-               ArrayList<Tuple2<String, Tuple2<Long, Long>>> range;
-               public ConvertDoubleArrayToRangeRows(long clen, int bclen, 
ArrayList<Tuple2<String, Tuple2<Long, Long>>> range) {
-                       this.bclen = bclen;
-                       this.clen = clen;
-                       this.range = range;
-               }
-
-               @Override
-               public Row call(Tuple2<Long, Iterable<Tuple2<Long, Double[]>>> 
arg0)
-                               throws Exception {
-                       
-                       HashMap<Long, Double[]> partialRows = new HashMap<Long, 
Double[]>();
-                       int sizeOfPartialRows = 0;
-                       for(Tuple2<Long, Double[]> kv : arg0._2) {
-                               partialRows.put(kv._1, kv._2);
-                               sizeOfPartialRows += kv._2.length;
-                       }
-                       
-                       // Insert first row as row index
-                       Object[] row = new Object[range.size() + 1];
-                       
-                       double [] vecVals = new double[sizeOfPartialRows];
-                       
-                       for(long columnBlockIndex = 1; columnBlockIndex <= 
partialRows.size(); columnBlockIndex++) {
-                               if(partialRows.containsKey(columnBlockIndex)) {
-                                       Double [] array = 
partialRows.get(columnBlockIndex);
-                                       // 
------------------------------------------------------------------
-                                       //      Compute local block size: 
-                                       int lclen = 
UtilFunctions.computeBlockSize(clen, columnBlockIndex, bclen);
-                                       // 
------------------------------------------------------------------
-                                       if(array.length != lclen) {
-                                               throw new Exception("Incorrect 
double array provided by ProjectRows");
-                                       }
-                                       for(int i = 0; i < lclen; i++) {
-                                               vecVals[(int) 
((columnBlockIndex-1)*bclen + i)] = array[i];
-                                       }
-                               }
-                               else {
-                                       throw new Exception("The block for 
column index " + columnBlockIndex + " is missing. Make sure the last 
instruction is not returning empty blocks");
-                               }
-                       }
-                       
-                       long rowIndex = arg0._1;
-                       row[0] = (double) rowIndex;
-                       
-                       int i = 1;
-                       
-                       //for(Entry<String, Tuple2<Long, Long>> e : 
range.entrySet()) {
-                       for(int k = 0; k < range.size(); k++) {
-                               long low = range.get(k)._2._1;
-                               long high = range.get(k)._2._2;
-                               
-                               if(high < low) {
-                                       throw new Exception("Incorrect range:" 
+ high + "<" + low);
-                               }
-                               
-                               if(low == high) {
-                                       row[i] = vecVals[(int) (low - 1)];
-                               }
-                               else {
-                                       int lengthOfVector = (int) (high - low 
+ 1);
-                                       double [] tempVector = new 
double[lengthOfVector];
-                                       for(int j = 0; j < lengthOfVector; j++) 
{
-                                               tempVector[j] = vecVals[(int) 
(low + j - 1)];
-                                       }
-                                       row[i] = new DenseVector(tempVector);
-                               }
-                               
-                               i++;
-                       }
-
-                       return RowFactory.create(row);
-               }
-       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
 
b/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
deleted file mode 100644
index 1572e41..0000000
--- 
a/src/main/java/org/apache/sysml/api/ml/functions/ConvertSingleColumnToString.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.api.ml.functions;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-public class ConvertSingleColumnToString implements Function<Row, String> {
-
-       private static final long serialVersionUID = -499763403738768970L;
-
-       @Override
-       public String call(Row row) throws Exception {
-               return row.apply(0).toString();
-       }
-}
-       

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java 
b/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
deleted file mode 100644
index f24f171..0000000
--- a/src/main/java/org/apache/sysml/api/ml/functions/ConvertVectorToDouble.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.api.ml.functions;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-
-public class ConvertVectorToDouble implements Function<Row, Double> {
-
-       private static final long serialVersionUID = -6612447783777073929L;
-
-       @Override
-       public Double call(Row row) throws Exception {
-               
-               return row.getDouble(0);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 15aa15e..0661a93 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -29,7 +29,6 @@ import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.spark.Accumulator;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -54,10 +53,7 @@ import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertStringToLong
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
-import 
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.DataFrameAnalysisFunction;
-import 
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.DataFrameToBinaryBlockFunction;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -67,7 +63,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 import scala.collection.JavaConversions;
 import scala.reflect.ClassTag;
@@ -196,14 +191,10 @@ public class MLContextConversionUtil {
        public static FrameObject frameBlockToFrameObject(String variableName, 
FrameBlock frameBlock,
                        FrameMetadata frameMetadata) {
                try {
-                       MatrixCharacteristics matrixCharacteristics;
-                       if (frameMetadata != null) {
-                               matrixCharacteristics = 
frameMetadata.asMatrixCharacteristics();
-                       } else {
-                               matrixCharacteristics = new 
MatrixCharacteristics();
-                       }
-                       MatrixFormatMetaData mtd = new 
MatrixFormatMetaData(matrixCharacteristics, OutputInfo.BinaryBlockOutputInfo,
-                                       InputInfo.BinaryBlockInputInfo);
+                       MatrixCharacteristics mc = (frameMetadata != null) ? 
+                                       frameMetadata.asMatrixCharacteristics() 
: new MatrixCharacteristics();
+                       MatrixFormatMetaData mtd = new MatrixFormatMetaData(mc, 
+                                       OutputInfo.BinaryBlockOutputInfo, 
InputInfo.BinaryBlockInputInfo);
                        FrameObject frameObject = new 
FrameObject(OptimizerUtils.getUniqueTempFileName(), mtd);
                        frameObject.acquireModify(frameBlock);
                        frameObject.release();
@@ -325,15 +316,11 @@ public class MLContextConversionUtil {
         *         {@code MatrixObject}
         */
        public static MatrixObject dataFrameToMatrixObject(String variableName, 
DataFrame dataFrame,
-                       MatrixMetadata matrixMetadata) {
-               if (matrixMetadata == null) {
-                       matrixMetadata = new MatrixMetadata();
-               }
-               JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = 
MLContextConversionUtil
-                               .dataFrameToMatrixBinaryBlocks(dataFrame, 
matrixMetadata);
-               MatrixObject matrixObject = 
MLContextConversionUtil.binaryBlocksToMatrixObject(variableName, binaryBlock,
-                               matrixMetadata);
-               return matrixObject;
+               MatrixMetadata matrixMetadata) 
+       {
+               matrixMetadata = (matrixMetadata!=null) ? matrixMetadata : new 
MatrixMetadata();
+               JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = 
dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata);
+               return binaryBlocksToMatrixObject(variableName, binaryBlock, 
matrixMetadata);
        }
 
        /**
@@ -417,38 +404,27 @@ public class MLContextConversionUtil {
         *         {@code JavaPairRDD<MatrixIndexes,
         *         MatrixBlock>} binary-block matrix
         */
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToMatrixBinaryBlocks(DataFrame dataFrame,
-                       MatrixMetadata matrixMetadata) {
-
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToMatrixBinaryBlocks(
+                       DataFrame dataFrame, MatrixMetadata matrixMetadata) 
+       {
+               //handle meta data
                determineMatrixFormatIfNeeded(dataFrame, matrixMetadata);
-
-               MatrixCharacteristics matrixCharacteristics;
-               if (matrixMetadata != null) {
-                       matrixCharacteristics = 
matrixMetadata.asMatrixCharacteristics();
-                       if (matrixCharacteristics == null) {
-                               matrixCharacteristics = new 
MatrixCharacteristics();
-                       }
-               } else {
-                       matrixCharacteristics = new MatrixCharacteristics();
-               }
-
-               if (isDataFrameWithIDColumn(matrixMetadata)) {
-                       dataFrame = dataFrame.sort("__INDEX").drop("__INDEX");
-               }
-
-               boolean isVectorBasedDataFrame = 
isVectorBasedDataFrame(matrixMetadata);
-
-               determineDataFrameDimensionsIfNeeded(dataFrame, 
matrixCharacteristics, isVectorBasedDataFrame);
-               if (matrixMetadata != null) {
-                       // so external reference can be updated with the 
metadata
-                       
matrixMetadata.setMatrixCharacteristics(matrixCharacteristics);
-               }
-
-               JavaRDD<Row> javaRDD = dataFrame.javaRDD();
-               JavaPairRDD<Row, Long> prepinput = javaRDD.zipWithIndex();
-               JavaPairRDD<MatrixIndexes, MatrixBlock> out = prepinput
-                               .mapPartitionsToPair(new 
DataFrameToBinaryBlockFunction(matrixCharacteristics, isVectorBasedDataFrame));
-               out = RDDAggregateUtils.mergeByKey(out);
+               MatrixCharacteristics mc = (matrixMetadata != null && 
matrixMetadata.asMatrixCharacteristics()!=null) ?
+                               matrixMetadata.asMatrixCharacteristics() : new 
MatrixCharacteristics();
+               boolean containsID = isDataFrameWithIDColumn(matrixMetadata);
+               boolean isVector = isVectorBasedDataFrame(matrixMetadata);
+       
+               //get spark context
+               JavaSparkContext sc = 
MLContextUtil.getJavaSparkContext((MLContext) 
MLContextProxy.getActiveMLContextForAPI());
+
+               //convert data frame to binary block matrix
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils
+                               .dataFrameToBinaryBlock(sc, dataFrame, mc, 
containsID, isVector);
+               
+               //update determined matrix characteristics
+               if( matrixMetadata != null )
+                       matrixMetadata.setMatrixCharacteristics(mc);
+               
                return out;
        }
 
@@ -486,7 +462,7 @@ public class MLContextConversionUtil {
                StructType schema = dataFrame.schema();
                boolean hasID = false;
                try {
-                       schema.fieldIndex("__INDEX");
+                       schema.fieldIndex(RDDConverterUtils.DF_ID_COLUMN);
                        hasID = true;
                } catch (IllegalArgumentException iae) {
                }
@@ -522,14 +498,9 @@ public class MLContextConversionUtil {
         *         otherwise.
         */
        public static boolean isDataFrameWithIDColumn(MatrixMetadata 
matrixMetadata) {
-               if (matrixMetadata == null) {
-                       return false;
-               }
-               MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
-               if (matrixFormat == null) {
-                       return false;
-               }
-               return matrixFormat.hasIDColumn();
+               return (matrixMetadata != null 
+                       && matrixMetadata.getMatrixFormat() != null
+                       && matrixMetadata.getMatrixFormat().hasIDColumn());
        }
 
        /**
@@ -541,14 +512,9 @@ public class MLContextConversionUtil {
         *         otherwise.
         */
        public static boolean isDataFrameWithIDColumn(FrameMetadata 
frameMetadata) {
-               if (frameMetadata == null) {
-                       return false;
-               }
-               FrameFormat frameFormat = frameMetadata.getFrameFormat();
-               if (frameFormat == null) {
-                       return false;
-               }
-               return frameFormat.hasIDColumn();
+               return (frameMetadata != null 
+                       && frameMetadata.getFrameFormat() != null
+                       && frameMetadata.getFrameFormat().hasIDColumn());
        }
 
        /**
@@ -560,51 +526,9 @@ public class MLContextConversionUtil {
         *         otherwise.
         */
        public static boolean isVectorBasedDataFrame(MatrixMetadata 
matrixMetadata) {
-               if (matrixMetadata == null) {
-                       return false;
-               }
-               MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
-               if (matrixFormat == null) {
-                       return false;
-               }
-               return matrixFormat.isVectorBased();
-       }
-
-       /**
-        * If the {@code DataFrame} dimensions aren't present in the
-        * {@code MatrixCharacteristics} metadata, determine the dimensions and
-        * place them in the {@code MatrixCharacteristics} metadata.
-        * 
-        * @param dataFrame
-        *            the Spark {@code DataFrame}
-        * @param matrixCharacteristics
-        *            the matrix metadata
-        * @param vectorBased
-        *            is the DataFrame vector-based
-        */
-       public static void determineDataFrameDimensionsIfNeeded(DataFrame 
dataFrame,
-                       MatrixCharacteristics matrixCharacteristics, boolean 
vectorBased) {
-               if (!matrixCharacteristics.dimsKnown(true)) {
-                       MLContext activeMLContext = (MLContext) 
MLContextProxy.getActiveMLContextForAPI();
-                       SparkContext sparkContext = 
activeMLContext.getSparkContext();
-                       @SuppressWarnings("resource")
-                       JavaSparkContext javaSparkContext = new 
JavaSparkContext(sparkContext);
-
-                       Accumulator<Double> aNnz = 
javaSparkContext.accumulator(0L);
-                       JavaRDD<Row> javaRDD = dataFrame.javaRDD().map(new 
DataFrameAnalysisFunction(aNnz, vectorBased));
-                       long numRows = javaRDD.count();
-                       long numColumns;
-                       if (vectorBased) {
-                               Vector v = (Vector) javaRDD.first().get(0);
-                               numColumns = v.size();
-                       } else {
-                               numColumns = dataFrame.columns().length;
-                       }
-
-                       long numNonZeros = UtilFunctions.toLong(aNnz.value());
-                       matrixCharacteristics.set(numRows, numColumns, 
matrixCharacteristics.getRowsPerBlock(),
-                                       
matrixCharacteristics.getColsPerBlock(), numNonZeros);
-               }
+               return (matrixMetadata != null 
+                       && matrixMetadata.getMatrixFormat() != null
+                       && matrixMetadata.getMatrixFormat().isVectorBased());
        }
 
        /**
@@ -864,14 +788,8 @@ public class MLContextConversionUtil {
         */
        public static JavaRDD<String> 
binaryBlockMatrixToJavaRDDStringIJV(BinaryBlockMatrix binaryBlockMatrix) {
                JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlock = 
binaryBlockMatrix.getBinaryBlocks();
-               MatrixCharacteristics matrixCharacteristics = 
binaryBlockMatrix.getMatrixCharacteristics();
-               try {
-                       JavaRDD<String> javaRDDString = 
RDDConverterUtilsExt.binaryBlockToStringRDD(binaryBlock,
-                                       matrixCharacteristics, "text");
-                       return javaRDDString;
-               } catch (DMLRuntimeException e) {
-                       throw new MLContextException("Exception converting 
BinaryBlockMatrix to JavaRDD<String> (ijv)", e);
-               }
+               MatrixCharacteristics mc = 
binaryBlockMatrix.getMatrixCharacteristics();
+               return RDDConverterUtils.binaryBlockToTextCell(binaryBlock, mc);
        }
 
        /**
@@ -1285,21 +1203,14 @@ public class MLContextConversionUtil {
                        @SuppressWarnings("unchecked")
                        JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlockMatrix = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
sparkExecutionContext
                                        
.getRDDHandleForMatrixObject(matrixObject, InputInfo.BinaryBlockInputInfo);
-                       MatrixCharacteristics matrixCharacteristics = 
matrixObject.getMatrixCharacteristics();
+                       MatrixCharacteristics mc = 
matrixObject.getMatrixCharacteristics();
 
-                       MLContext activeMLContext = (MLContext) 
MLContextProxy.getActiveMLContextForAPI();
-                       SparkContext sc = activeMLContext.getSparkContext();
-                       SQLContext sqlContext = new SQLContext(sc);
-                       DataFrame df = null;
-                       if (isVectorDF) {
-                               df = 
RDDConverterUtilsExt.binaryBlockToVectorDataFrame(binaryBlockMatrix, 
matrixCharacteristics,
-                                               sqlContext);
-                       } else {
-                               df = 
RDDConverterUtilsExt.binaryBlockToDataFrame(binaryBlockMatrix, 
matrixCharacteristics, sqlContext);
-                       }
-
-                       return df;
-               } catch (DMLRuntimeException e) {
+                       SparkContext sc = ((MLContext) 
MLContextProxy.getActiveMLContextForAPI()).getSparkContext();
+                       SQLContext sqlctx = new SQLContext(sc);
+                       
+                       return RDDConverterUtils.binaryBlockToDataFrame(sqlctx, 
binaryBlockMatrix, mc, isVectorDF);                     
+               } 
+               catch (DMLRuntimeException e) {
                        throw new MLContextException("DMLRuntimeException while 
converting matrix object to DataFrame", e);
                }
        }
@@ -1321,11 +1232,11 @@ public class MLContextConversionUtil {
                                        
.getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo);
                        MatrixCharacteristics matrixCharacteristics = 
frameObject.getMatrixCharacteristics();
 
-                       JavaSparkContext jsc = MLContextUtil
-                                       .getJavaSparkContext((MLContext) 
MLContextProxy.getActiveMLContextForAPI());
+                       JavaSparkContext jsc = 
MLContextUtil.getJavaSparkContext((MLContext) 
MLContextProxy.getActiveMLContextForAPI());
 
                        return 
FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, 
matrixCharacteristics, jsc);
-               } catch (DMLRuntimeException e) {
+               } 
+               catch (DMLRuntimeException e) {
                        throw new MLContextException("DMLRuntimeException while 
converting frame object to DataFrame", e);
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 5b4e736..6c75048 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -976,18 +976,9 @@ public final class MLContextUtil {
         *         FrameObject, {@code false} otherwise.
         */
        public static boolean 
doesSymbolTableContainFrameObject(LocalVariableMap symbolTable, String 
variableName) {
-               if (symbolTable == null) {
-                       return false;
-               }
-               Data data = symbolTable.get(variableName);
-               if (data == null) {
-                       return false;
-               }
-               if (data instanceof FrameObject) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (symbolTable != null
+                       && symbolTable.keySet().contains(variableName)
+                       && symbolTable.get(variableName) instanceof 
FrameObject);
        }
 
        /**
@@ -1002,18 +993,8 @@ public final class MLContextUtil {
         *         MatrixObject, {@code false} otherwise.
         */
        public static boolean 
doesSymbolTableContainMatrixObject(LocalVariableMap symbolTable, String 
variableName) {
-               if (symbolTable == null) {
-                       return false;
-               }
-               Data data = symbolTable.get(variableName);
-               if (data == null) {
-                       return false;
-               }
-               if (data instanceof MatrixObject) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (symbolTable != null
+                       && symbolTable.keySet().contains(variableName)
+                       && symbolTable.get(variableName) instanceof 
MatrixObject);
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 0cfec66..e4e2606 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -36,7 +36,6 @@ import 
org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
-import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -192,7 +191,7 @@ public class WriteSPInstruction extends SPInstruction
                                header = 
sec.getSparkContext().parallelize(headerContainer);
                        }
                        
-                       JavaRDD<String> ijv = in1.flatMap(new 
ConvertMatrixBlockToIJVLines(mc.getRowsPerBlock(), mc.getColsPerBlock()));
+                       JavaRDD<String> ijv = 
RDDConverterUtils.binaryBlockToTextCell(in1, mc);
                        if(header != null)
                                customSaveTextFile(header.union(ijv), fname, 
true);
                        else

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 3998a3f..17bdea8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -34,14 +35,25 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.mllib.linalg.DenseVector;
+import org.apache.spark.mllib.linalg.Vector;
+import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
 
 import scala.Tuple2;
 
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
+import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -57,6 +69,8 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class RDDConverterUtils 
 {
+       public static final String DF_ID_COLUMN = "__INDEX";
+       
        /**
         * 
         * @param sc
@@ -135,6 +149,17 @@ public class RDDConverterUtils
        /**
         * 
         * @param in
+        * @param mc
+        * @return
+        */
+       public static JavaRDD<String> 
binaryBlockToTextCell(JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
MatrixCharacteristics mc) {
+               return in.flatMap(new ConvertMatrixBlockToIJVLines(
+                               mc.getRowsPerBlock(), mc.getColsPerBlock()));
+       }
+       
+       /**
+        * 
+        * @param in
         * @param mcIn
         * @param props
         * @param strict
@@ -244,6 +269,85 @@ public class RDDConverterUtils
        
        /**
         * 
+        * @param sc
+        * @param df
+        * @param mcOut
+        * @param containsID
+        * @param isVector
+        * @param columns
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
+                       DataFrame df, MatrixCharacteristics mc, boolean 
containsID, boolean isVector) 
+       {
+               //determine unknown dimensions and sparsity if required
+               if( !mc.dimsKnown(true) ) {
+                       Accumulator<Double> aNnz = sc.accumulator(0L);
+                       JavaRDD<Row> tmp = df.javaRDD().map(new 
DataFrameAnalysisFunction(aNnz, containsID, isVector));
+                       long rlen = tmp.count();
+                       long clen = !isVector ? df.columns().length - 
(containsID?1:0) : 
+                                       ((Vector) 
tmp.first().get(containsID?1:0)).size();
+                       long nnz = UtilFunctions.toLong(aNnz.value());
+                       mc.set(rlen, clen, mc.getRowsPerBlock(), 
mc.getColsPerBlock(), nnz);
+               }
+               
+               //ensure valid blocksizes
+               if( mc.getRowsPerBlock()<=1 || mc.getColsPerBlock()<=1 ) {
+                       mc.setBlockSize(ConfigurationManager.getBlocksize());
+               }
+               
+               JavaPairRDD<Row, Long> prepinput = containsID ?
+                               df.javaRDD().mapToPair(new 
DataFrameExtractIDFunction()) :
+                               df.javaRDD().zipWithIndex(); //zip row index
+               
+               //convert csv rdd to binary block rdd (w/ partial blocks)
+               JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
+                               prepinput.mapPartitionsToPair(
+                                       new DataFrameToBinaryBlockFunction(mc, 
containsID, isVector));
+               
+               //aggregate partial matrix blocks
+               out = RDDAggregateUtils.mergeByKey( out ); 
+               
+               return out;
+       }
+       
+       /**
+        * 
+        * @param sqlContext
+        * @param in
+        * @param mc
+        * @param toVector
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext, 
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
MatrixCharacteristics mc, boolean toVector)  
+       {
+               if( !mc.colsKnown() )
+                       throw new RuntimeException("Number of columns needed to 
convert binary block to data frame.");
+               
+               //slice blocks into rows, align and convert into data frame rows
+               JavaRDD<Row> rowsRDD = in
+                       .flatMapToPair(new 
SliceBinaryBlockToRowsFunction(mc.getRowsPerBlock()))
+                       .groupByKey().map(new 
ConvertRowBlocksToRows((int)mc.getCols(), mc.getColsPerBlock(), toVector));
+               
+               //create data frame schema
+               List<StructField> fields = new ArrayList<StructField>();
+               fields.add(DataTypes.createStructField(DF_ID_COLUMN, 
DataTypes.DoubleType, false));
+               if( toVector )
+                       fields.add(DataTypes.createStructField("C1", new 
VectorUDT(), false));
+               else { // row
+                       for(int i = 1; i <= mc.getCols(); i++)
+                               fields.add(DataTypes.createStructField("C"+i, 
DataTypes.DoubleType, false));
+               }
+               
+               //rdd to data frame conversion
+               return sqlContext.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
+       }
+       
+       /**
+        * 
         * @param in
         * @return
         */
@@ -751,4 +855,213 @@ public class RDDConverterUtils
                        return new Tuple2<MatrixIndexes,MatrixBlock>(new 
MatrixIndexes(rowIndex, 1),out);
                }               
        }
+
+       /////////////////////////////////
+       // DATAFRAME-SPECIFIC FUNCTIONS
+
+       /**
+        * 
+        */
+       private static class DataFrameToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,MatrixIndexes,MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
653447740362447236L;
+               
+               private long _rlen = -1;
+               private long _clen = -1;
+               private int _brlen = -1;
+               private int _bclen = -1;
+               private boolean _sparse = false;
+               private boolean _containsID;
+               private boolean _isVector;
+               
+               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean containsID, boolean isVector) {
+                       _rlen = mc.getRows();
+                       _clen = mc.getCols();
+                       _brlen = mc.getRowsPerBlock();
+                       _bclen = mc.getColsPerBlock();
+                       _sparse = mc.nnzKnown() && 
MatrixBlock.evalSparseFormatInMemory(
+                                       mc.getRows(), mc.getCols(), 
mc.getNonZeros());
+                       _containsID = containsID;
+                       _isVector = isVector;
+               }
+               
+               @Override
+               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
+                       
+                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
+                       MatrixIndexes[] ix = new MatrixIndexes[ncblks];
+                       MatrixBlock[] mb = new MatrixBlock[ncblks];
+                       
+                       while( arg0.hasNext() )
+                       {
+                               Tuple2<Row,Long> tmp = arg0.next();
+                               long rowix = tmp._2() + 1;
+                               
+                               long rix = 
UtilFunctions.computeBlockIndex(rowix, _brlen);
+                               int pos = 
UtilFunctions.computeCellInBlock(rowix, _brlen);
+                       
+                               //create new blocks for entire row
+                               if( ix[0] == null || ix[0].getRowIndex() != rix 
) {
+                                       if( ix[0] !=null )
+                                               flushBlocksToList(ix, mb, ret);
+                                       long len = 
UtilFunctions.computeBlockSize(_rlen, rix, _brlen);
+                                       createBlocks(rowix, (int)len, ix, mb);
+                               }
+                               
+                               //process row data
+                               int off = _containsID ? 1: 0;
+                               if( _isVector ) {
+                                       Vector vect = (Vector) 
tmp._1().get(off);
+                                       for( int cix=1, pix=0; cix<=ncblks; 
cix++ ) {
+                                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                                               for( int j=0; j<lclen; j++ )
+                                                       
mb[cix-1].appendValue(pos, j, vect.apply(pix++));
+                                       }       
+                               }
+                               else { //row
+                                       Row row = tmp._1();
+                                       for( int cix=1, pix=off; cix<=ncblks; 
cix++ ) {
+                                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                                               for( int j=0; j<lclen; j++ )
+                                                       
mb[cix-1].appendValue(pos, j, UtilFunctions.getDouble(row.get(pix++)));
+                                       }
+                               }
+                       }
+               
+                       //flush last blocks
+                       flushBlocksToList(ix, mb, ret);
+               
+                       return ret;             
+               }
+               
+               // Creates new state of empty column blocks for current global 
row index.
+               private void createBlocks(long rowix, int lrlen, 
MatrixIndexes[] ix, MatrixBlock[] mb)
+               {
+                       //compute row block index and number of column blocks
+                       long rix = UtilFunctions.computeBlockIndex(rowix, 
_brlen);
+                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
+                       
+                       //create all column blocks (assume dense since csv is 
dense text format)
+                       for( int cix=1; cix<=ncblks; cix++ ) {
+                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                               ix[cix-1] = new MatrixIndexes(rix, cix);
+                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse);             
+                       }
+               }
+               
+               // Flushes current state of filled column blocks to output list.
+               private void flushBlocksToList( MatrixIndexes[] ix, 
MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+                       throws DMLRuntimeException
+               {
+                       int len = ix.length;                    
+                       for( int i=0; i<len; i++ )
+                               if( mb[i] != null ) {
+                                       ret.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i]));
+                                       mb[i].examSparsity(); //ensure right 
representation
+                               }       
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class DataFrameAnalysisFunction implements 
Function<Row,Row>  
+       {       
+               private static final long serialVersionUID = 
5705371332119770215L;
+               
+               private Accumulator<Double> _aNnz = null;
+               private boolean _containsID;
+               private boolean _isVector;
+               
+               public DataFrameAnalysisFunction( Accumulator<Double> aNnz, 
boolean containsID, boolean isVector) {
+                       _aNnz = aNnz;
+                       _containsID = containsID;
+                       _isVector = isVector;
+               }
+
+               @Override
+               public Row call(Row arg0) throws Exception {
+                       //determine number of non-zeros of row
+                       int off = _containsID ? 1 : 0;
+                       long lnnz = 0;
+                       if( _isVector ) {
+                               //note: numNonzeros scans entries but handles 
sparse/dense
+                               Vector vec = (Vector) arg0.get(off);
+                               lnnz += vec.numNonzeros();
+                       }
+                       else { //row
+                               for(int i=off; i<arg0.length(); i++)
+                                       lnnz += 
UtilFunctions.isNonZero(arg0.get(i)) ? 1 : 0;
+                       }
+               
+                       //update counters
+                       _aNnz.add( (double)lnnz );
+                       return arg0;
+               }
+       }
+
+       /**
+        * 
+        */
+       private static class DataFrameExtractIDFunction implements 
PairFunction<Row, Row,Long> 
+       {
+               private static final long serialVersionUID = 
7438855241666363433L;
+
+               @Override
+               public Tuple2<Row, Long> call(Row arg0) throws Exception {
+                       //extract 1-based IDs and convert to 0-based positions
+                       long id = 
UtilFunctions.toLong(UtilFunctions.getDouble(arg0.get(0)));
+                       return new Tuple2<Row,Long>(arg0, id-1);
+               }
+       }
+       
+       /**
+        * 
+        */
+       private static class ConvertRowBlocksToRows implements 
Function<Tuple2<Long, Iterable<Tuple2<Long, MatrixBlock>>>, Row> {
+               
+               private static final long serialVersionUID = 
4441184411670316972L;
+               
+               private int _clen;
+               private int _bclen;
+               private boolean _toVector;
+               
+               public ConvertRowBlocksToRows(int clen, int bclen, boolean 
toVector) {
+                       _clen = clen;
+                       _bclen = bclen;
+                       _toVector = toVector;
+               }
+
+               @Override
+               public Row call(Tuple2<Long, Iterable<Tuple2<Long, 
MatrixBlock>>> arg0)
+                       throws Exception 
+               {
+                       Object[] row = new Object[_toVector ? 2 : _clen+1];
+                       row[0] = (double) arg0._1(); //row index
+                       
+                       //copy block data into target row
+                       if( _toVector ) {
+                               double[] tmp = new double[_clen];
+                               for(Tuple2<Long, MatrixBlock> kv : arg0._2()) {
+                                       int cl = (kv._1().intValue()-1)*_bclen;
+                                       MatrixBlock mb = kv._2();
+                                       DataConverter.copyToDoubleVector(mb, 
tmp, cl);
+                               }
+                               row[1] = new DenseVector(tmp);
+                       }
+                       else {
+                               for(Tuple2<Long, MatrixBlock> kv : arg0._2()) {
+                                       int cl = (kv._1().intValue()-1)*_bclen;
+                                       MatrixBlock mb = kv._2();
+                                       for( int j=0; j<mb.getNumColumns(); j++ 
)
+                                               row[cl+j+1] = 
mb.quickGetValue(0, j);
+                               }
+                       }
+                       
+                       return RowFactory.create(row);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 34e5a91..641285f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -52,8 +52,6 @@ import java.nio.ByteOrder;
 
 import scala.Tuple2;
 
-import org.apache.sysml.api.MLOutput.ConvertDoubleArrayToRows;
-import org.apache.sysml.api.MLOutput.ProjectRows;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertMatrixBlockToIJVLines;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
@@ -131,19 +129,6 @@ public class RDDConverterUtilsExt
        {
                return coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), 
input, mcIn, true);
        }
-       
-       // Useful for printing, testing binary blocked RDD and also for 
external use.
-       public static JavaRDD<String> 
binaryBlockToStringRDD(JavaPairRDD<MatrixIndexes, MatrixBlock> input, 
MatrixCharacteristics mcIn, String format) throws DMLRuntimeException {
-               if(format.equals("text")) {
-                       JavaRDD<String> ijv = input.flatMap(new 
ConvertMatrixBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock()));
-                       return ijv;
-               }
-               else {
-                       throw new DMLRuntimeException("The output format:" + 
format + " is not implemented yet.");
-               }
-       }
-
-
 
        public static DataFrame stringDataFrameToVectorDataFrame(SQLContext 
sqlContext, DataFrame inputDF)
                        throws DMLRuntimeException {
@@ -207,78 +192,6 @@ public class RDDConverterUtilsExt
                return outDF;
        }
 
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
vectorDataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame inputDF, MatrixCharacteristics mcOut, boolean 
containsID, String vectorColumnName) throws DMLRuntimeException {
-               return vectorDataFrameToBinaryBlock(new JavaSparkContext(sc), 
inputDF, mcOut, containsID, vectorColumnName);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
vectorDataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame inputDF, MatrixCharacteristics mcOut, boolean 
containsID, String vectorColumnName)
-                       throws DMLRuntimeException {
-               
-               if(containsID) {
-                       inputDF = dropColumn(inputDF.sort("__INDEX"), 
"__INDEX");
-               }
-               
-               DataFrame df = inputDF.select(vectorColumnName);
-                       
-               //determine unknown dimensions and sparsity if required
-               if( !mcOut.dimsKnown(true) ) {
-                       Accumulator<Double> aNnz = sc.accumulator(0L);
-                       JavaRDD<Row> tmp = df.javaRDD().map(new 
DataFrameAnalysisFunction(aNnz, true));
-                       long rlen = tmp.count();
-                       long clen = ((Vector) tmp.first().get(0)).size();
-                       long nnz = UtilFunctions.toLong(aNnz.value());
-                       mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), nnz);
-               }
-               
-               JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-                               .zipWithIndex(); //zip row index
-               
-               //convert csv rdd to binary block rdd (w/ partial blocks)
-               JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-                               prepinput.mapPartitionsToPair(
-                                       new 
DataFrameToBinaryBlockFunction(mcOut, true));
-               
-               //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
-               
-               return out;
-       }
-       
-       /**
-        * Adding utility to support for dropping columns for older Spark 
versions.
-        * @param df
-        * @param column
-        * @return
-        * @throws DMLRuntimeException
-        */
-       public static DataFrame dropColumn(DataFrame df, String column) throws 
DMLRuntimeException {
-               ArrayList<String> columnToSelect = new ArrayList<String>();
-               String firstCol = null;
-               boolean colPresent = false;
-               for(String col : df.columns()) {
-                       if(col.equals(column)) {
-                               colPresent = true;
-                       }
-                       else if(firstCol == null) {
-                               firstCol = col;
-                       }
-                       else {
-                               columnToSelect.add(col);
-                       }
-               }
-               
-               if(!colPresent) {
-                       throw new DMLRuntimeException("The column \"" + column 
+ "\" is not present in the dataframe.");
-               }
-               else if(firstCol == null) {
-                       throw new DMLRuntimeException("No column other than \"" 
+ column + "\" present in the dataframe.");
-               }
-               
-               // Round about way to do in Java (not exposed in Spark 1.3.0): 
df = df.drop("__INDEX");
-               return df.select(firstCol, 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
-       }
        
        public static DataFrame projectColumns(DataFrame df, ArrayList<String> 
columns) throws DMLRuntimeException {
                ArrayList<String> columnToSelect = new ArrayList<String>();
@@ -288,44 +201,6 @@ public class RDDConverterUtilsExt
                return df.select(columns.get(0), 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
        }
        
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) throws DMLRuntimeException {
-               return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, 
mcOut, containsID, null);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, String [] 
columns) throws DMLRuntimeException {
-               ArrayList<String> columns1 = new 
ArrayList<String>(Arrays.asList(columns));
-               return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, 
mcOut, false, columns1);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, 
ArrayList<String> columns) throws DMLRuntimeException {
-               return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, 
mcOut, false, columns);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID, String [] columns) 
-                       throws DMLRuntimeException {
-               ArrayList<String> columns1 = new 
ArrayList<String>(Arrays.asList(columns));
-               return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, 
mcOut, containsID, columns1);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(SparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID, ArrayList<String> columns) 
-                       throws DMLRuntimeException {
-               return dataFrameToBinaryBlock(new JavaSparkContext(sc), df, 
mcOut, containsID, columns);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) throws DMLRuntimeException {
-               return dataFrameToBinaryBlock(sc, df, mcOut, containsID, null);
-       }
-       
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, 
ArrayList<String> columns) throws DMLRuntimeException {
-               return dataFrameToBinaryBlock(sc, df, mcOut, false, columns);
-       }
        
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, 
int clen) throws DMLRuntimeException {
                return convertPy4JArrayToMB(data, rlen, clen, false);
@@ -386,77 +261,6 @@ public class RDDConverterUtilsExt
                return ret;
        }
        
-       /**
-        * Converts DataFrame into binary blocked RDD. 
-        * Note: mcOut will be set if you don't know the dimensions.
-        * @param sc
-        * @param df
-        * @param mcOut
-        * @param containsID
-        * @param columns
-        * @return
-        * @throws DMLRuntimeException
-        */
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID, ArrayList<String> columns) 
-                       throws DMLRuntimeException {
-               if(columns != null) {
-                       df = projectColumns(df, columns);
-               }
-               
-               if(containsID) {
-                       df = dropColumn(df.sort("__INDEX"), "__INDEX");
-               }
-                       
-               //determine unknown dimensions and sparsity if required
-               if( !mcOut.dimsKnown(true) ) {
-                       Accumulator<Double> aNnz = sc.accumulator(0L);
-                       JavaRDD<Row> tmp = df.javaRDD().map(new 
DataFrameAnalysisFunction(aNnz, false));
-                       long rlen = tmp.count();
-                       long clen = containsID ? (df.columns().length - 1) : 
df.columns().length;
-                       long nnz = UtilFunctions.toLong(aNnz.value());
-                       mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), nnz);
-               }
-               
-               JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-                               .zipWithIndex(); //zip row index
-               
-               //convert csv rdd to binary block rdd (w/ partial blocks)
-               JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
-                               prepinput.mapPartitionsToPair(
-                                       new 
DataFrameToBinaryBlockFunction(mcOut, false));
-               
-               //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
-               
-               return out;
-       }
-       
-       public static DataFrame 
binaryBlockToVectorDataFrame(JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlockRDD, 
-                       MatrixCharacteristics mc, SQLContext sqlContext) throws 
DMLRuntimeException {
-               long rlen = mc.getRows(); long clen = mc.getCols();
-               int brlen = mc.getRowsPerBlock(); int bclen = 
mc.getColsPerBlock();
-               // Very expensive operation here: groupByKey (where number of 
keys might be too large)
-               JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new 
ProjectRows(rlen, clen, brlen, bclen))
-                               .groupByKey().map(new 
ConvertDoubleArrayToRows(clen, bclen, true));
-               
-               int numColumns = (int) clen;
-               if(numColumns <= 0) {
-                       throw new DMLRuntimeException("Output dimensions 
unknown after executing the script and hence cannot create the dataframe");
-               }
-               
-               List<StructField> fields = new ArrayList<StructField>();
-               // LongTypes throw an error: java.lang.Double incompatible with 
java.lang.Long
-               fields.add(DataTypes.createStructField("__INDEX", 
DataTypes.DoubleType, false));
-               fields.add(DataTypes.createStructField("C1", new VectorUDT(), 
false));
-               // fields.add(DataTypes.createStructField("C1", 
DataTypes.createArrayType(DataTypes.DoubleType), false));
-               
-               // This will cause infinite recursion due to bug in Spark
-               // https://issues.apache.org/jira/browse/SPARK-6999
-               // return sqlContext.createDataFrame(rowsRDD, colNames); // 
where ArrayList<String> colNames
-               return sqlContext.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
-       }
-       
        public static class AddRowID implements Function<Tuple2<Row,Long>, Row> 
{
                private static final long serialVersionUID = 
-3733816995375745659L;
 
@@ -492,33 +296,7 @@ public class RDDConverterUtilsExt
                return sqlContext.createDataFrame(newRows, new 
StructType(newSchema));
        }
        
-       public static DataFrame 
binaryBlockToDataFrame(JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockRDD, 
-                       MatrixCharacteristics mc, SQLContext sqlContext) throws 
DMLRuntimeException {
-               long rlen = mc.getRows(); long clen = mc.getCols();
-               int brlen = mc.getRowsPerBlock(); int bclen = 
mc.getColsPerBlock();
-               
-               // Very expensive operation here: groupByKey (where number of 
keys might be too large)
-               JavaRDD<Row> rowsRDD = binaryBlockRDD.flatMapToPair(new 
ProjectRows(rlen, clen, brlen, bclen))
-                               .groupByKey().map(new 
ConvertDoubleArrayToRows(clen, bclen, false));
-               
-               int numColumns = (int) clen;
-               if(numColumns <= 0) {
-                       // numColumns = rowsRDD.first().length() - 1; // Ugly, 
so instead prefer to throw
-                       throw new DMLRuntimeException("Output dimensions 
unknown after executing the script and hence cannot create the dataframe");
-               }
-               
-               List<StructField> fields = new ArrayList<StructField>();
-               // LongTypes throw an error: java.lang.Double incompatible with 
java.lang.Long
-               fields.add(DataTypes.createStructField("__INDEX", 
DataTypes.DoubleType, false));
-               for(int i = 1; i <= numColumns; i++) {
-                       fields.add(DataTypes.createStructField("C" + i, 
DataTypes.DoubleType, false));
-               }
-               
-               // This will cause infinite recursion due to bug in Spark
-               // https://issues.apache.org/jira/browse/SPARK-6999
-               // return sqlContext.createDataFrame(rowsRDD, colNames); // 
where ArrayList<String> colNames
-               return sqlContext.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
-       }
+       
        
        private static class MatrixEntryToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> 
        {
@@ -535,131 +313,6 @@ public class RDDConverterUtilsExt
                }
 
        }
-
-       public static class DataFrameAnalysisFunction implements 
Function<Row,Row>  {
-               private static final long serialVersionUID = 
5705371332119770215L;
-               private RowAnalysisFunctionHelper helper = null;
-               boolean isVectorBasedRDD;
-               public DataFrameAnalysisFunction( Accumulator<Double> aNnz, 
boolean isVectorBasedRDD) {
-                       helper = new RowAnalysisFunctionHelper(aNnz);
-                       this.isVectorBasedRDD = isVectorBasedRDD;
-               }
-
-               @Override
-               public Row call(Row arg0) throws Exception {
-                       if(isVectorBasedRDD)
-                               return helper.analyzeVector(arg0);
-                       else
-                               return helper.analyzeRow(arg0);
-               }
-               
-       }
-       
-       private static class CSVToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Text,Long>>,MatrixIndexes,MatrixBlock> {
-               private static final long serialVersionUID = 
1501589201971233542L;
-               
-               private RowToBinaryBlockFunctionHelper helper = null; 
-               
-               public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
String delim, boolean fill, double fillValue) {
-                       helper = new RowToBinaryBlockFunctionHelper(mc, delim, 
fill, fillValue);
-               }
-               
-               @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<Text, Long>> arg0) throws Exception {
-                       return helper.convertToBinaryBlock(arg0, 
RDDConverterTypes.TEXT_TO_DOUBLEARR);
-               }
-               
-       }
-       
-       public static class DataFrameToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<Row,Long>>,MatrixIndexes,MatrixBlock> {
-               private static final long serialVersionUID = 
653447740362447236L;
-               private RowToBinaryBlockFunctionHelper helper = null; 
-               boolean isVectorBasedDF;
-               
-               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean isVectorBasedDF) {
-                       helper = new RowToBinaryBlockFunctionHelper(mc);
-                       this.isVectorBasedDF = isVectorBasedDF;
-               }
-               
-               @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
-                       if(isVectorBasedDF)
-                               return helper.convertToBinaryBlock(arg0, 
RDDConverterTypes.VECTOR_TO_DOUBLEARR);
-                       else
-                               return helper.convertToBinaryBlock(arg0, 
RDDConverterTypes.ROW_TO_DOUBLEARR);
-               }
-               
-       }
-       
-       private static class RowAnalysisFunctionHelper implements Serializable 
-       {
-               private static final long serialVersionUID = 
2310303223289674477L;
-
-               private Accumulator<Double> _aNnz = null;
-               private String _delim = null;
-               
-               public RowAnalysisFunctionHelper( Accumulator<Double> aNnz ) {
-                       _aNnz = aNnz;
-               }
-               
-               public RowAnalysisFunctionHelper( Accumulator<Double> aNnz, 
String delim ) {
-                       _aNnz = aNnz;
-                       _delim = delim;
-               }
-               
-               public String analyzeText(Text v1) throws Exception {
-                       //parse input line
-                       String line = v1.toString();
-                       String[] cols = IOUtilFunctions.split(line, _delim);
-                       
-                       //determine number of non-zeros of row (w/o string 
parsing)
-                       long lnnz = 0;
-                       for( String col : cols ) {
-                               if( !col.isEmpty() && !col.equals("0") && 
!col.equals("0.0") ) {
-                                       lnnz++;
-                               }
-                       }
-                       
-                       //update counters
-                       _aNnz.add( (double)lnnz );
-                       
-                       return line;
-               }
-               
-               public Row analyzeRow(Row arg0) throws Exception {
-                       //determine number of non-zeros of row
-                       long lnnz = 0;
-                       if(arg0 != null) {
-                               for(int i = 0; i < arg0.length(); i++) {
-                                       
if(RowToBinaryBlockFunctionHelper.getDoubleValue(arg0, i) != 0) {
-                                               lnnz++;
-                                       }
-                               }
-                       }
-                       else {
-                               throw new Exception("Error while analyzing 
row");
-                       }
-                       
-                       //update counters
-                       _aNnz.add( (double)lnnz );
-                       
-                       return arg0;
-               }
-               
-               public Row analyzeVector(Row row)  {
-                       Vector vec = (Vector) row.get(0); // assumption: 1 
column DF
-                       long lnnz = 0;
-                       for(int i = 0; i < vec.size(); i++) {
-                               if(vec.apply(i) != 0) { 
-                                       lnnz++;
-                               }
-                       }
-                       
-                       //update counters
-                       _aNnz.add( (double)lnnz );
-                       return row;
-               }
-       }
        
        private static class IJVToBinaryBlockFunctionHelper implements 
Serializable {
                private static final long serialVersionUID = 
-7952801318564745821L;
@@ -768,178 +421,4 @@ public class RDDConverterUtilsExt
                        ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
                }
        }
-       
-       /**
-        * This functions allows to map rdd partitions of csv rows into a set 
of partial binary blocks.
-        * 
-        * NOTE: For this csv to binary block function, we need to hold all 
output blocks per partition 
-        * in-memory. Hence, we keep state of all column blocks and aggregate 
row segments into these blocks. 
-        * In terms of memory consumption this is better than creating partial 
blocks of row segments.
-        * 
-        */
-       private static class RowToBinaryBlockFunctionHelper implements 
Serializable 
-       {
-               private static final long serialVersionUID = 
-4948430402942717043L;
-               
-               private long _rlen = -1;
-               private long _clen = -1;
-               private int _brlen = -1;
-               private int _bclen = -1;
-               private String _delim = null;
-               private boolean _fill = false;
-               private double _fillValue = 0;
-               
-               public RowToBinaryBlockFunctionHelper(MatrixCharacteristics mc)
-               {
-                       _rlen = mc.getRows();
-                       _clen = mc.getCols();
-                       _brlen = mc.getRowsPerBlock();
-                       _bclen = mc.getColsPerBlock();
-               }
-               
-               public RowToBinaryBlockFunctionHelper(MatrixCharacteristics mc, 
String delim, boolean fill, double fillValue)
-               {
-                       _rlen = mc.getRows();
-                       _clen = mc.getCols();
-                       _brlen = mc.getRowsPerBlock();
-                       _bclen = mc.getColsPerBlock();
-                       _delim = delim;
-                       _fill = fill;
-                       _fillValue = fillValue;
-               }
-               
-               boolean emptyFound = false;
-               
-               // ----------------------------------------------------
-               public double[] textToDoubleArray(Text row) {
-                       String[] parts = IOUtilFunctions.split(row.toString(), 
_delim);
-                       double[] ret = new double[parts.length];
-                       int ix = 0;
-                       for(String part : parts) {
-                               emptyFound |= part.isEmpty() && !_fill;
-                               double val = (part.isEmpty() && _fill) ?
-                                               _fillValue : 
Double.parseDouble(part);
-                               ret[ix++] = val;
-                       }
-                       return ret;
-               }
-               public double[] rowToDoubleArray(Row row) throws Exception {
-                       double[] ret = new double[row.length()];
-                       for(int i = 0; i < row.length(); i++) {
-                               ret[i] = getDoubleValue(row, i);
-                       }
-                       return ret;
-               }
-               
-               public double[] vectorToDoubleArray(Vector arg) throws 
Exception {
-                       return arg.toDense().values();
-               }
-               // ----------------------------------------------------
-
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
convertToBinaryBlock(Object arg0, RDDConverterTypes converter) 
-                       throws Exception 
-               {
-                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
-
-                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
-                       MatrixIndexes[] ix = new MatrixIndexes[ncblks];
-                       MatrixBlock[] mb = new MatrixBlock[ncblks];
-                       
-                       @SuppressWarnings("unchecked")
-                       Iterator<Tuple2<?,Long>> iter = (Iterator<Tuple2<?, 
Long>>) arg0;
-                       while( iter.hasNext() )
-                       {
-                               Tuple2<?,Long> tmp = iter.next();
-                               // String row = tmp._1();
-                               long rowix = tmp._2() + 1;
-                               
-                               long rix = 
UtilFunctions.computeBlockIndex(rowix, _brlen);
-                               int pos = 
UtilFunctions.computeCellInBlock(rowix, _brlen);
-                       
-                               //create new blocks for entire row
-                               if( ix[0] == null || ix[0].getRowIndex() != rix 
) {
-                                       if( ix[0] !=null )
-                                               flushBlocksToList(ix, mb, ret);
-                                       long len = 
UtilFunctions.computeBlockSize(_rlen, rix, _brlen);
-                                       createBlocks(rowix, (int)len, ix, mb);
-                               }
-                               
-                               //process row data
-                               emptyFound = false;
-                               double[] parts = null;
-                               switch(converter) {
-                                       case TEXT_TO_DOUBLEARR:
-                                               parts = 
textToDoubleArray((Text) tmp._1());
-                                               break;
-                                       case ROW_TO_DOUBLEARR:
-                                               parts = rowToDoubleArray((Row) 
tmp._1());
-                                               break;
-                                       case VECTOR_TO_DOUBLEARR:
-                                               parts = 
vectorToDoubleArray((Vector) ((Row) tmp._1()).get(0));
-                                               break;
-                                       default:
-                                               throw new Exception("Invalid 
converter for row-based data:" + converter.toString());
-                               }
-                               
-                               for( int cix=1, pix=0; cix<=ncblks; cix++ ) 
-                               {
-                                       int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
-                                       for( int j=0; j<lclen; j++ ) {
-                                               double val = parts[pix++];
-                                               mb[cix-1].appendValue(pos, j, 
val);
-                                       }       
-                               }
-               
-                               //sanity check empty cells filled w/ values
-                               if(converter == 
RDDConverterTypes.TEXT_TO_DOUBLEARR)
-                                       
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(((Text) tmp._1()).toString(), 
_fill, emptyFound);
-                       }
-               
-                       //flush last blocks
-                       flushBlocksToList(ix, mb, ret);
-               
-                       return ret;
-               }
-                       
-               // Creates new state of empty column blocks for current global 
row index.
-               private void createBlocks(long rowix, int lrlen, 
MatrixIndexes[] ix, MatrixBlock[] mb)
-               {
-                       //compute row block index and number of column blocks
-                       long rix = UtilFunctions.computeBlockIndex(rowix, 
_brlen);
-                       int ncblks = (int)Math.ceil((double)_clen/_bclen);
-                       
-                       //create all column blocks (assume dense since csv is 
dense text format)
-                       for( int cix=1; cix<=ncblks; cix++ ) {
-                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
-                               ix[cix-1] = new MatrixIndexes(rix, cix);
-                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
false);               
-                       }
-               }
-               
-               // Flushes current state of filled column blocks to output list.
-               private void flushBlocksToList( MatrixIndexes[] ix, 
MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
-                       throws DMLRuntimeException
-               {
-                       int len = ix.length;                    
-                       for( int i=0; i<len; i++ )
-                               if( mb[i] != null ) {
-                                       ret.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i]));
-                                       mb[i].examSparsity(); //ensure right 
representation
-                               }       
-               }
-               
-               public static double getDoubleValue(Row row, int index) throws 
Exception {
-                       try {
-                               return row.getDouble(index);
-                       } catch(Exception e) {
-                               try {
-                                       // Causes lock-contention for Java 7
-                                       return 
Double.parseDouble(row.get(index).toString());
-                               }
-                               catch(Exception e1) {
-                                       throw new Exception("Only double types 
are supported as input to SystemML. The input argument is \'" + row.get(index) 
+ "\'");
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 65f3b1a..ae0fca7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -155,14 +155,16 @@ public class MatrixCharacteristics implements Serializable
                +", blocks ("+numRowsPerBlock+" x "+numColumnsPerBlock+")]";
        }
        
-       public void setDimension(long nr, long nc)
-       {
+       public void setDimension(long nr, long nc) {
                numRows = nr;
                numColumns = nc;
        }
        
-       public void setBlockSize(int bnr, int bnc)
-       {
+       public void setBlockSize(int blen) {
+               setBlockSize(blen, blen);
+       }
+       
+       public void setBlockSize(int bnr, int bnc) {
                numRowsPerBlock = bnr;
                numColumnsPerBlock = bnc;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index c790ae9..9bb27d9 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -904,6 +904,33 @@ public class DataConverter
        /**
         * 
         * @param mb
+        * @param dest
+        * @param destPos
+        */
+       public static void copyToDoubleVector( MatrixBlock mb, double[] dest, 
int destPos )
+       {
+               if( mb.isEmptyBlock(false) )
+                       return; //quick path
+                       
+               int rows = mb.getNumRows();
+               int cols = mb.getNumColumns();
+               
+               if( mb.isInSparseFormat() ) {
+                       Iterator<IJV> iter = mb.getSparseBlockIterator();
+                       while( iter.hasNext() ) {
+                               IJV cell = iter.next();
+                               dest[destPos+cell.getI()*cols+cell.getJ()] = 
cell.getV();
+                       }
+               }
+               else {
+                       //memcopy row major representation if at least 1 
non-zero
+                       System.arraycopy(mb.getDenseBlock(), 0, dest, destPos, 
rows*cols);
+               }
+       }
+       
+       /**
+        * 
+        * @param mb
         * @return
         */
        public static String toString(MatrixBlock mb) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index fa17fcd..1ac552f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -601,13 +601,12 @@ public class UtilFunctions
        }
        
        
-       /*
+       /**
         * This function will return datatype, if its Matrix or Frame
         * 
         *  @param      str
         *              Instruction string to execute
         */
-       
        public static DataType getDataType(String str, int index)
        {
                String[] parts = 
InstructionUtils.getInstructionPartsWithValueType(str);
@@ -615,4 +614,29 @@ public class UtilFunctions
        
                return in1.getDataType();
        }
+       
+       /**
+        * 
+        * @param obj
+        * @return
+        */
+       public static double getDouble(Object obj) {
+               return (obj instanceof Double) ? (Double)obj :
+                       Double.parseDouble(obj.toString());
+       }
+       
+       /**
+        * 
+        * @param obj
+        * @return
+        */
+       public static boolean isNonZero(Object obj) {
+               if( obj instanceof Double ) 
+                       return ((Double) obj) != 0;
+               else {
+                       //avoid expensive double parsing
+                       String sobj = obj.toString();
+                       return (!sobj.equals("0") && !sobj.equals("0.0"));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
new file mode 100644
index 0000000..c19865c
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameConversionTest extends AutomatedTestBase 
+{
+       private final static String TEST_DIR = "functions/mlcontext/";
+       private final static String TEST_NAME = "DataFrameConversion";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameConversionTest.class.getSimpleName() + "/";
+
+       private final static int  rows1 = 2245;
+       private final static int  cols1 = 745;
+       private final static int  cols2 = 1264;
+       private final static double sparsity1 = 0.9;
+       private final static double sparsity2 = 0.1;
+       private final static double eps=0.0000000001;
+
+        
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+       }
+       
+       @Test
+       public void testVectorConversionSingleDense() {
+               testDataFrameConversion(true, true, true, false);
+       }
+       
+       @Test
+       public void testVectorConversionSingleDenseUnknown() {
+               testDataFrameConversion(true, true, true, true);
+       }
+       
+       @Test
+       public void testVectorConversionSingleSparse() {
+               testDataFrameConversion(true, true, false, false);
+       }
+       
+       @Test
+       public void testVectorConversionSingleSparseUnknown() {
+               testDataFrameConversion(true, true, false, true);
+       }
+       
+       @Test
+       public void testVectorConversionMultiDense() {
+               testDataFrameConversion(true, false, true, false);
+       }
+       
+       @Test
+       public void testVectorConversionMultiDenseUnknown() {
+               testDataFrameConversion(true, false, true, true);
+       }
+       
+       @Test
+       public void testVectorConversionMultiSparse() {
+               testDataFrameConversion(true, false, false, false);
+       }
+       
+       @Test
+       public void testVectorConversionMultiSparseUnknown() {
+               testDataFrameConversion(true, false, false, true);
+       }
+
+       @Test
+       public void testRowConversionSingleDense() {
+               testDataFrameConversion(false, true, true, false);
+       }
+       
+       @Test
+       public void testRowConversionSingleDenseUnknown() {
+               testDataFrameConversion(false, true, true, true);
+       }
+       
+       @Test
+       public void testRowConversionSingleSparse() {
+               testDataFrameConversion(false, true, false, false);
+       }
+       
+       @Test
+       public void testRowConversionSingleSparseUnknown() {
+               testDataFrameConversion(false, true, false, true);
+       }
+       
+       @Test
+       public void testRowConversionMultiDense() {
+               testDataFrameConversion(false, false, true, false);
+       }
+       
+       @Test
+       public void testRowConversionMultiDenseUnknown() {
+               testDataFrameConversion(false, false, true, true);
+       }
+       
+       @Test
+       public void testRowConversionMultiSparse() {
+               testDataFrameConversion(false, false, false, false);
+       }
+       
+       @Test
+       public void testRowConversionMultiSparseUnknown() {
+               testDataFrameConversion(false, false, false, true);
+       }
+       
+       /**
+        * 
+        * @param vector
+        * @param singleColBlock
+        * @param dense
+        * @param unknownDims
+        */
+       private void testDataFrameConversion(boolean vector, boolean 
singleColBlock, boolean dense, boolean unknownDims) {
+               boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+               RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+               SparkExecutionContext sec = null;
+               
+               try
+               {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+                       DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+                       
+                       //generate input data and setup metadata
+                       int cols = singleColBlock ? cols1 : cols2;
+                       double sparsity = dense ? sparsity1 : sparsity2; 
+                       double[][] A = getRandomMatrix(rows1, cols, -10, 10, 
sparsity, 2373); 
+                       MatrixBlock mbA = 
DataConverter.convertToMatrixBlock(A); 
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+                       MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+                       
+                       //setup spark context
+                       sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
+                       JavaSparkContext sc = sec.getSparkContext();
+                       SQLContext sqlctx = new SQLContext(sc);
+                       
+                       //get binary block input rdd
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+                       
+                       //matrix - dataframe - matrix conversion
+                       DataFrame df = 
RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+                       
+                       //get output matrix block
+                       MatrixBlock mbB = 
SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+                       
+                       //compare matrix blocks
+                       double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+                       TestUtils.compareMatrices(A, B, rows1, cols, eps);
+               }
+               catch( Exception ex ) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       sec.close();
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+                       DMLScript.rtplatform = oldPlatform;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/085009a3/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
index 749f8c1..7b68065 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/GNMFTest.java
@@ -50,6 +50,7 @@ import 
org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.utils.TestUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt;
 import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
 import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
@@ -201,7 +202,7 @@ public class GNMFTest extends AutomatedTestBase
                                MatrixCharacteristics mcW = 
out.getMatrixCharacteristics("W");
                                CoordinateMatrix coordinateMatrix = new 
CoordinateMatrix(matRDD.rdd(), mcW.getRows(), mcW.getCols());
                                JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryRDD = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(sc, 
coordinateMatrix, mcW, true);
-                               JavaRDD<String> wOut = 
RDDConverterUtilsExt.binaryBlockToStringRDD(binaryRDD, mcW, "text");
+                               JavaRDD<String> wOut = 
RDDConverterUtils.binaryBlockToTextCell(binaryRDD, mcW);
                                
                                String fName = output("w");
                                try {

Reply via email to