Repository: incubator-systemml Updated Branches: refs/heads/master 9820f4c52 -> 9c19b4771
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9c19b477/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java index c8d3450..78d4968 100644 --- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java +++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextTest.java @@ -42,7 +42,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -61,13 +60,13 @@ import org.apache.sysml.api.mlcontext.BinaryBlockMatrix; import org.apache.sysml.api.mlcontext.MLContext; import org.apache.sysml.api.mlcontext.MLContextConversionUtil; import org.apache.sysml.api.mlcontext.MLContextException; +import org.apache.sysml.api.mlcontext.MLContextUtil; import org.apache.sysml.api.mlcontext.MLResults; import org.apache.sysml.api.mlcontext.MatrixFormat; import org.apache.sysml.api.mlcontext.MatrixMetadata; import org.apache.sysml.api.mlcontext.Script; import org.apache.sysml.api.mlcontext.ScriptExecutor; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.test.integration.AutomatedTestBase; import org.junit.After; @@ -86,18 +85,15 @@ public class MLContextTest extends AutomatedTestBase { protected final static String TEST_DIR = "org/apache/sysml/api/mlcontext"; protected final static String TEST_NAME = "MLContext"; - private static SparkConf conf; + private static SparkSession spark; private static JavaSparkContext sc; private static MLContext ml; @BeforeClass public static void setUpClass() { - if (conf == null) - conf = SparkExecutionContext.createSystemMLSparkConf() - .setAppName("MLContextTest").setMaster("local"); - if (sc == null) - sc = new JavaSparkContext(conf); - ml = new MLContext(sc); + spark = createSystemMLSparkSession("MLContextTest", "local"); + ml = new MLContext(spark); + sc = MLContextUtil.getJavaSparkContext(ml); } @Override @@ -513,13 +509,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES); @@ -539,13 +534,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES); @@ -565,14 +559,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX); @@ -592,14 +585,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX); @@ -619,14 +611,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX); @@ -646,14 +637,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_DOUBLES_WITH_INDEX); @@ -673,12 +663,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX); @@ -698,12 +687,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX); @@ -723,12 +711,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX); @@ -748,12 +735,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, org.apache.spark.mllib.linalg.Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleMllibVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR_WITH_INDEX); @@ -773,11 +759,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR); @@ -797,11 +782,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR); @@ -821,11 +805,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<org.apache.spark.mllib.linalg.Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new MllibVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR); @@ -845,11 +828,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<org.apache.spark.mllib.linalg.Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new MllibVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new org.apache.spark.mllib.linalg.VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(MatrixFormat.DF_VECTOR); @@ -1677,13 +1659,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); BinaryBlockMatrix binaryBlockMatrix = new BinaryBlockMatrix(dataFrame); Script script = dml("avg = avg(M);").in("M", binaryBlockMatrix).out("avg"); @@ -1702,13 +1683,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); BinaryBlockMatrix binaryBlockMatrix = new BinaryBlockMatrix(dataFrame); Script script = pydml("avg = avg(M)").in("M", binaryBlockMatrix).out("avg"); @@ -1971,13 +1951,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(3, 3, 9); @@ -1997,13 +1976,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); MatrixMetadata mm = new MatrixMetadata(3, 3, 9); @@ -2187,13 +2165,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame); setExpectedStdOut("sum: 27.0"); @@ -2211,13 +2188,12 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame); setExpectedStdOut("sum: 27.0"); @@ -2235,14 +2211,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame); setExpectedStdOut("sum: 27.0"); @@ -2260,14 +2235,13 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C2", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C3", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = pydml("print('sum: ' + sum(M))").in("M", dataFrame); setExpectedStdOut("sum: 27.0"); @@ -2285,12 +2259,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame); setExpectedStdOut("sum: 45.0"); @@ -2308,12 +2281,11 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Tuple2<Double, Vector>> javaRddTuple = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddTuple.map(new DoubleVectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame); setExpectedStdOut("sum: 45.0"); @@ -2331,11 +2303,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M));").in("M", dataFrame); setExpectedStdOut("sum: 45.0"); @@ -2353,11 +2324,10 @@ public class MLContextTest extends AutomatedTestBase { JavaRDD<Vector> javaRddVector = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddVector.map(new VectorRow()); - SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", new VectorUDT(), true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> dataFrame = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> dataFrame = spark.createDataFrame(javaRddRow, schema); Script script = dml("print('sum: ' + sum(M))").in("M", dataFrame); setExpectedStdOut("sum: 45.0"); @@ -2800,11 +2770,11 @@ public class MLContextTest extends AutomatedTestBase { @AfterClass public static void tearDownClass() { - // stop spark context to allow single jvm tests (otherwise the + // stop underlying spark context to allow single jvm tests (otherwise the // next test that tries to create a SparkContext would fail) - sc.stop(); + spark.stop(); sc = null; - conf = null; + spark = null; // clear status mlcontext and spark exec context ml.close();
