Repository: incubator-systemml
Updated Branches:
  refs/heads/master bbc77e71e -> e7510a03b


[SYSTEMML-1280] Restore and deprecate SQLContext methods

Restore Java SQLContext method signatures (migrated to SparkSession by
SYSTEMML-1194) in case any users are using the old SystemML methods and
are unable to use SparkSessions.
Applies to old MLContext, MLMatrix, MLOutput, FrameRDDConverterUtils,
RDDConverterUtils, and RDDConverterUtilsExt.

Closes #396.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e7510a03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e7510a03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e7510a03

Branch: refs/heads/master
Commit: e7510a03b95929b34e2fe6632c640851816716ea
Parents: bbc77e7
Author: Deron Eriksson <[email protected]>
Authored: Thu Feb 16 15:12:36 2017 -0800
Committer: Deron Eriksson <[email protected]>
Committed: Thu Feb 16 15:12:36 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    | 18 +++++-
 .../java/org/apache/sysml/api/MLMatrix.java     | 20 +++++-
 .../java/org/apache/sysml/api/MLOutput.java     | 66 +++++++++++++++++++-
 .../spark/utils/FrameRDDConverterUtils.java     |  9 +++
 .../spark/utils/RDDConverterUtils.java          |  9 +++
 .../spark/utils/RDDConverterUtilsExt.java       | 17 ++++-
 6 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index a128c37..a5dc859 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.jmlc.JMLCUtils;
@@ -1593,5 +1594,20 @@ public class MLContext {
                JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = 
out.getBinaryBlockedRDD("output");
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
                return MLMatrix.createMLMatrix(this, sparkSession, blocks, 
mcOut);
-       }       
+       }
+
+       /**
+        * Experimental API: Might be discontinued in future release
+        * @param sqlContext the SQL Context
+        * @param filePath the file path
+        * @param format the format
+        * @return the MLMatrix
+        * @throws IOException if IOException occurs
+        * @throws DMLException if DMLException occurs
+        * @throws ParseException if ParseException occurs
+        */
+       public MLMatrix read(SQLContext sqlContext, String filePath, String 
format) throws IOException, DMLException, ParseException {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return read(sparkSession, filePath, format);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java 
b/src/main/java/org/apache/sysml/api/MLMatrix.java
index 873e831..45f631f 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
@@ -76,11 +77,21 @@ public class MLMatrix extends Dataset<Row> {
                this.ml = ml;
        }
 
+       protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, 
MLContext ml) {
+               super(sqlContext, logicalPlan, RowEncoder.apply(null));
+               this.ml = ml;
+       }
+
        protected MLMatrix(SparkSession sparkSession, QueryExecution 
queryExecution, MLContext ml) {
                super(sparkSession, queryExecution, RowEncoder.apply(null));
                this.ml = ml;
        }
-       
+
+       protected MLMatrix(SQLContext sqlContext, QueryExecution 
queryExecution, MLContext ml) {
+               super(sqlContext.sparkSession(), queryExecution, 
RowEncoder.apply(null));
+               this.ml = ml;
+       }
+
        // Only used internally to set a new MLMatrix after one of matrix 
operations.
        // Not to be used externally.
        protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext 
ml) throws DMLRuntimeException {
@@ -110,7 +121,12 @@ public class MLMatrix extends Dataset<Row> {
                StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
                return new 
MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
        }
-       
+
+       static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, 
JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) 
throws DMLRuntimeException {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return createMLMatrix(ml, sparkSession, blocks, mc);
+       }
+
        /**
         * Convenient method to write a MLMatrix.
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index ca90fc9..a16eccd 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -26,6 +26,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -107,7 +108,24 @@ public class MLOutput {
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }
-       
+
+       /**
+        * Note, the output DataFrame has an additional column ID.
+        * An easy way to get DataFrame without ID is by df.drop("__INDEX")
+        * 
+        * @param sqlContext the SQL Context
+        * @param varName the variable name
+        * @return the DataFrame
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws 
DMLRuntimeException {
+               if (sqlContext == null) {
+                       throw new DMLRuntimeException("SQLContext is not 
created");
+               }
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return getDF(sparkSession, varName);
+       }
+
        /**
         * Obtain the DataFrame
         * 
@@ -134,7 +152,24 @@ public class MLOutput {
                }
                
        }
-       
+
+       /**
+        * Obtain the DataFrame
+        * 
+        * @param sqlContext the SQL Context
+        * @param varName the variable name
+        * @param outputVector if true, returns DataFrame with two column: ID 
and org.apache.spark.ml.linalg.Vector
+        * @return the DataFrame
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
boolean outputVector) throws DMLRuntimeException {
+               if (sqlContext == null) {
+                       throw new DMLRuntimeException("SQLContext is not 
created");
+               }
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return getDF(sparkSession, varName, outputVector);
+       }
+
        /**
         * This methods improves the performance of MLPipeline wrappers.
         * 
@@ -153,6 +188,25 @@ public class MLOutput {
                JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = 
getBinaryBlockedRDD(varName);
                return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, 
binaryBlockRDD, mc, true);
        }
+
+       /**
+        * This methods improves the performance of MLPipeline wrappers.
+        * 
+        * @param sqlContext the SQL Context
+        * @param varName the variable name
+        * @param mc the matrix characteristics
+        * @return the DataFrame
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
MatrixCharacteristics mc) 
+               throws DMLRuntimeException 
+       {
+               if (sqlContext == null) {
+                       throw new DMLRuntimeException("SQLContext is not 
created");
+               }
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return getDF(sparkSession, varName, mc);
+       }
        
        public JavaRDD<String> getStringRDD(String varName, String format) 
throws DMLRuntimeException {
                if(format.equals("text")) {
@@ -202,4 +256,12 @@ public class MLOutput {
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }
+       
+       public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String 
varName) throws DMLRuntimeException {
+               if (sqlContext == null) {
+                       throw new DMLRuntimeException("SQLContext is not 
created");
+               }
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return getMLMatrix(ml, sparkSession, varName);
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3d5df56..013a1a8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
@@ -285,6 +286,14 @@ public class FrameRDDConverterUtils
                return sparkSession.createDataFrame(rowRDD, dfSchema);
        }
        
+       @Deprecated
+       public static Dataset<Row> binaryBlockToDataFrame(SQLContext 
sqlContext, JavaPairRDD<Long,FrameBlock> in, 
+                       MatrixCharacteristics mc, ValueType[] schema)
+       {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return binaryBlockToDataFrame(sparkSession, in, mc, schema);
+       }
+       
        
        /**
         * This function will convert Frame schema into DataFrame schema 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6b6c61d..e847471 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.Vectors;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
@@ -286,6 +287,14 @@ public class RDDConverterUtils
                return sparkSession.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
        }
 
+       @Deprecated
+       public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext,
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
MatrixCharacteristics mc, boolean toVector)  
+       {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return binaryBlockToDataFrame(sparkSession, in, mc, toVector);
+       }
+
        public static JavaPairRDD<LongWritable, Text> 
stringToSerializableText(JavaPairRDD<Long,String> in)
        {
                return in.mapToPair(new TextToSerTextFunction());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index f4a02dd..973db64 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -236,7 +236,22 @@ public class RDDConverterUtilsExt
                JavaRDD<Row> newRows = 
df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID());
                return sparkSession.createDataFrame(newRows, new 
StructType(newSchema));
        }
-       
+
+       /**
+        * Add element indices as new column to DataFrame
+        * 
+        * @param df input data frame
+        * @param sqlContext the SQL Context
+        * @param nameOfCol name of index column
+        * @return new data frame
+        * 
+        * @deprecated This will be removed in SystemML 1.0.
+        */
+       @Deprecated
+       public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext 
sqlContext, String nameOfCol) {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return addIDToDataFrame(df, sparkSession, nameOfCol);
+       }
        
        
        private static class MatrixEntryToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock> 

Reply via email to