Repository: spark Updated Branches: refs/heads/master bcfee153b -> ed0b4070f
http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 0ba72b0..0f416eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -177,23 +177,23 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te (Seq[Integer](3), null, null)).toDF("key", "value1", "value2") data3.write.saveAsTable("agg3") - val emptyDF = sqlContext.createDataFrame( + val emptyDF = spark.createDataFrame( sparkContext.emptyRDD[Row], StructType(StructField("key", StringType) :: StructField("value", IntegerType) :: Nil)) emptyDF.registerTempTable("emptyTable") // Register UDAFs - sqlContext.udf.register("mydoublesum", new MyDoubleSum) - sqlContext.udf.register("mydoubleavg", new MyDoubleAvg) - sqlContext.udf.register("longProductSum", new LongProductSum) + spark.udf.register("mydoublesum", new MyDoubleSum) + spark.udf.register("mydoubleavg", new MyDoubleAvg) + spark.udf.register("longProductSum", new LongProductSum) } override def afterAll(): Unit = { try { - sqlContext.sql("DROP TABLE IF EXISTS agg1") - sqlContext.sql("DROP TABLE IF EXISTS agg2") - sqlContext.sql("DROP TABLE IF EXISTS agg3") - sqlContext.dropTempTable("emptyTable") + spark.sql("DROP TABLE IF EXISTS agg1") + spark.sql("DROP TABLE IF EXISTS agg2") + spark.sql("DROP TABLE IF EXISTS agg3") + spark.catalog.dropTempTable("emptyTable") } finally { super.afterAll() } @@ -210,7 +210,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("empty table") { // If there is no GROUP BY clause and the table is empty, we will generate a single row. checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | AVG(value), @@ -227,7 +227,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, 0, 0, 0, null, null, null, null, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | AVG(value), @@ -246,7 +246,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te // If there is a GROUP BY clause and the table is empty, there is no output. checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | AVG(value), @@ -266,7 +266,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("null literal") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | AVG(null), @@ -282,7 +282,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("only do grouping") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT key |FROM agg1 @@ -291,7 +291,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT DISTINCT value1, key |FROM agg2 @@ -308,7 +308,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT value1, key |FROM agg2 @@ -326,7 +326,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT DISTINCT key |FROM agg3 @@ -341,7 +341,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(Seq[Integer](3)) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT value1, key |FROM agg3 @@ -363,7 +363,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("case in-sensitive resolution") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT avg(value), kEY - 100 |FROM agg1 @@ -372,7 +372,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(20.0, -99) :: Row(-0.5, -98) :: Row(null, -97) :: Row(10.0, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT sum(distinct value1), kEY - 100, count(distinct value1) |FROM agg2 @@ -381,7 +381,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(40, -99, 2) :: Row(0, -98, 2) :: Row(null, -97, 0) :: Row(30, null, 3) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT valUe * key - 100 |FROM agg1 @@ -397,7 +397,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("test average no key in output") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT avg(value) |FROM agg1 @@ -408,7 +408,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("test average") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT key, avg(value) |FROM agg1 @@ -417,7 +417,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(1, 20.0) :: Row(2, -0.5) :: Row(3, null) :: Row(null, 10.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT key, mean(value) |FROM agg1 @@ -426,7 +426,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(1, 20.0) :: Row(2, -0.5) :: Row(3, null) :: Row(null, 10.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT avg(value), key |FROM agg1 @@ -435,7 +435,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(20.0, 1) :: Row(-0.5, 2) :: Row(null, 3) :: Row(10.0, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT avg(value) + 1.5, key + 10 |FROM agg1 @@ -444,7 +444,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(21.5, 11) :: Row(1.0, 12) :: Row(null, 13) :: Row(11.5, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT avg(value) FROM agg1 """.stripMargin), @@ -456,7 +456,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te // deterministic. withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | first_valUE(key), @@ -472,7 +472,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, 3, null, 3, 1, 3, 1, 3) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | first_valUE(key), @@ -491,7 +491,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("udaf") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | key, @@ -511,7 +511,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("interpreted aggregate function") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT mydoublesum(value), key |FROM agg1 @@ -520,14 +520,14 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(60.0, 1) :: Row(-1.0, 2) :: Row(null, 3) :: Row(30.0, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT mydoublesum(value) FROM agg1 """.stripMargin), Row(89.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT mydoublesum(null) """.stripMargin), @@ -536,7 +536,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("interpreted and expression-based aggregation functions") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT mydoublesum(value), key, avg(value) |FROM agg1 @@ -548,7 +548,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(30.0, null, 10.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | mydoublesum(value + 1.5 * key), @@ -568,7 +568,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("single distinct column set") { // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword. checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | min(distinct value1), @@ -581,7 +581,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(-60, 70.0, 101.0/9.0, 5.6, 100)) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | mydoubleavg(distinct value1), @@ -600,7 +600,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | key, @@ -618,7 +618,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | count(value1), @@ -637,7 +637,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("single distinct multiple columns set") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | key, @@ -653,7 +653,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("multiple distinct multiple columns sets") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | key, @@ -681,7 +681,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("test count") { checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | count(value2), @@ -704,7 +704,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(0, null, 1, 1, null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT | count(value2), @@ -786,28 +786,28 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te covar_tab.registerTempTable("covar_tab") checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT corr(b, c) FROM covar_tab WHERE a < 1 """.stripMargin), Row(null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT corr(b, c) FROM covar_tab WHERE a < 3 """.stripMargin), Row(null) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT corr(b, c) FROM covar_tab WHERE a = 3 """.stripMargin), Row(Double.NaN) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT a, corr(b, c) FROM covar_tab GROUP BY a ORDER BY a """.stripMargin), @@ -818,7 +818,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(5, Double.NaN) :: Row(6, Double.NaN) :: Nil) - val corr7 = sqlContext.sql("SELECT corr(b, c) FROM covar_tab").collect()(0).getDouble(0) + val corr7 = spark.sql("SELECT corr(b, c) FROM covar_tab").collect()(0).getDouble(0) assert(math.abs(corr7 - 0.6633880657639323) < 1e-12) } @@ -852,7 +852,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } test("no aggregation function (SPARK-11486)") { - val df = sqlContext.range(20).selectExpr("id", "repeat(id, 1) as s") + val df = spark.range(20).selectExpr("id", "repeat(id, 1) as s") .groupBy("s").count() .groupBy().count() checkAnswer(df, Row(20) :: Nil) @@ -906,8 +906,8 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te } // Create a DF for the schema with random data. - val rdd = sqlContext.sparkContext.parallelize(data, 1) - val df = sqlContext.createDataFrame(rdd, schema) + val rdd = spark.sparkContext.parallelize(data, 1) + val df = spark.createDataFrame(rdd, schema) val allColumns = df.schema.fields.map(f => col(f.name)) val expectedAnswer = @@ -924,7 +924,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te test("udaf without specifying inputSchema") { withTempTable("noInputSchemaUDAF") { - sqlContext.udf.register("noInputSchema", new ScalaAggregateFunctionWithoutInputSchema) + spark.udf.register("noInputSchema", new ScalaAggregateFunctionWithoutInputSchema) val data = Row(1, Seq(Row(1), Row(2), Row(3))) :: @@ -935,13 +935,13 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te StructField("key", IntegerType) :: StructField("myArray", ArrayType(StructType(StructField("v", IntegerType) :: Nil))) :: Nil) - sqlContext.createDataFrame( + spark.createDataFrame( sparkContext.parallelize(data, 2), schema) .registerTempTable("noInputSchemaUDAF") checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT key, noInputSchema(myArray) |FROM noInputSchemaUDAF @@ -950,7 +950,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te Row(1, 21) :: Row(2, -10) :: Nil) checkAnswer( - sqlContext.sql( + spark.sql( """ |SELECT noInputSchema(myArray) |FROM noInputSchemaUDAF @@ -976,7 +976,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. // todo: remove it? - val newActual = Dataset.ofRows(sqlContext.sparkSession, actual.logicalPlan) + val newActual = Dataset.ofRows(spark, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { case Some(errorMessage) => http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0f23949..6dcc404 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ class HiveDDLSuite override def afterEach(): Unit = { try { // drop all databases, tables and functions after each test - sqlContext.sessionState.catalog.reset() + spark.sessionState.catalog.reset() } finally { super.afterEach() } @@ -212,7 +212,7 @@ class HiveDDLSuite test("drop views") { withTable("tab1") { val tabName = "tab1" - sqlContext.range(10).write.saveAsTable("tab1") + spark.range(10).write.saveAsTable("tab1") withView("view1") { val viewName = "view1" @@ -233,7 +233,7 @@ class HiveDDLSuite test("alter views - rename") { val tabName = "tab1" withTable(tabName) { - sqlContext.range(10).write.saveAsTable(tabName) + spark.range(10).write.saveAsTable(tabName) val oldViewName = "view1" val newViewName = "view2" withView(oldViewName, newViewName) { @@ -252,7 +252,7 @@ class HiveDDLSuite test("alter views - set/unset tblproperties") { val tabName = "tab1" withTable(tabName) { - sqlContext.range(10).write.saveAsTable(tabName) + spark.range(10).write.saveAsTable(tabName) val viewName = "view1" withView(viewName) { val catalog = hiveContext.sessionState.catalog @@ -290,7 +290,7 @@ class HiveDDLSuite test("alter views and alter table - misuse") { val tabName = "tab1" withTable(tabName) { - sqlContext.range(10).write.saveAsTable(tabName) + spark.range(10).write.saveAsTable(tabName) val oldViewName = "view1" val newViewName = "view2" withView(oldViewName, newViewName) { @@ -354,7 +354,7 @@ class HiveDDLSuite test("drop view using drop table") { withTable("tab1") { - sqlContext.range(10).write.saveAsTable("tab1") + spark.range(10).write.saveAsTable("tab1") withView("view1") { sql("CREATE VIEW view1 AS SELECT * FROM tab1") val message = intercept[AnalysisException] { @@ -383,7 +383,7 @@ class HiveDDLSuite } private def createDatabaseWithLocation(tmpDir: File, dirExists: Boolean): Unit = { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val dbName = "db1" val tabName = "tab1" val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf()) @@ -442,7 +442,7 @@ class HiveDDLSuite assert(!fs.exists(dbPath)) sql(s"CREATE DATABASE $dbName") - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db" val db1 = catalog.getDatabaseMetadata(dbName) assert(db1 == CatalogDatabase( @@ -518,7 +518,7 @@ class HiveDDLSuite test("desc table for data source table") { withTable("tab1") { val tabName = "tab1" - sqlContext.range(1).write.format("json").saveAsTable(tabName) + spark.range(1).write.format("json").saveAsTable(tabName) assert(sql(s"DESC $tabName").collect().length == 1) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index d07ac56..dd4321d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -347,7 +347,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode") } - sqlContext.dropTempTable("testUDF") + spark.catalog.dropTempTable("testUDF") } test("Hive UDF in group by") { http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1d597fe..2e4077d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -860,7 +860,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("Sorting columns are not in Generate") { withTempTable("data") { - sqlContext.range(1, 5) + spark.range(1, 5) .select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c")) .registerTempTable("data") @@ -1081,7 +1081,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // We don't support creating a temporary table while specifying a database val message = intercept[AnalysisException] { - sqlContext.sql( + spark.sql( s""" |CREATE TEMPORARY TABLE db.t |USING parquet @@ -1092,7 +1092,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { }.getMessage // If you use backticks to quote the name then it's OK. - sqlContext.sql( + spark.sql( s""" |CREATE TEMPORARY TABLE `db.t` |USING parquet @@ -1100,12 +1100,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { | path '$path' |) """.stripMargin) - checkAnswer(sqlContext.table("`db.t`"), df) + checkAnswer(spark.table("`db.t`"), df) } } test("SPARK-10593 same column names in lateral view") { - val df = sqlContext.sql( + val df = spark.sql( """ |select |insideLayer2.json as a2 @@ -1120,7 +1120,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ignore("SPARK-10310: " + "script transformation using default input/output SerDe and record reader/writer") { - sqlContext + spark .range(5) .selectExpr("id AS a", "id AS b") .registerTempTable("test") @@ -1138,7 +1138,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } ignore("SPARK-10310: script transformation using LazySimpleSerDe") { - sqlContext + spark .range(5) .selectExpr("id AS a", "id AS b") .registerTempTable("test") @@ -1183,7 +1183,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("run sql directly on files") { - val df = sqlContext.range(100).toDF() + val df = spark.range(100).toDF() withTempPath(f => { df.write.parquet(f.getCanonicalPath) checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"), @@ -1325,14 +1325,14 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Seq("3" -> "30").toDF("i", "j") .write.mode(SaveMode.Append).partitionBy("i").saveAsTable("tbl11453") checkAnswer( - sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"), + spark.read.table("tbl11453").select("i", "j").orderBy("i"), Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Nil) // make sure case sensitivity is correct. Seq("4" -> "40").toDF("i", "j") .write.mode(SaveMode.Append).partitionBy("I").saveAsTable("tbl11453") checkAnswer( - sqlContext.read.table("tbl11453").select("i", "j").orderBy("i"), + spark.read.table("tbl11453").select("i", "j").orderBy("i"), Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil) } } @@ -1370,7 +1370,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("multi-insert with lateral view") { withTempTable("t1") { - sqlContext.range(10) + spark.range(10) .select(array($"id", $"id" + 1).as("arr"), $"id") .registerTempTable("source") withTable("dest1", "dest2") { @@ -1388,10 +1388,10 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { """.stripMargin) checkAnswer( - sqlContext.table("dest1"), + spark.table("dest1"), sql("SELECT id FROM source WHERE id > 3")) checkAnswer( - sqlContext.table("dest2"), + spark.table("dest2"), sql("SELECT col FROM source LATERAL VIEW EXPLODE(arr) exp AS col WHERE col > 3")) } } @@ -1404,7 +1404,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withTempPath { dir => withTempTable("t1", "t2") { val path = dir.getCanonicalPath - val ds = sqlContext.range(10) + val ds = spark.range(10) ds.registerTempTable("t1") sql( @@ -1415,7 +1415,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { """.stripMargin) checkAnswer( - sqlContext.tables().select('isTemporary).filter('tableName === "t2"), + spark.wrapped.tables().select('isTemporary).filter('tableName === "t2"), Row(true) ) @@ -1429,7 +1429,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { "shouldn always be used together with PATH data source option" ) { withTempTable("t") { - sqlContext.range(10).registerTempTable("t") + spark.range(10).registerTempTable("t") val message = intercept[IllegalArgumentException] { sql( http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 72f9fba..f37037e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -30,11 +30,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { override def beforeAll(): Unit = { // Create a simple table with two columns: id and id1 - sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt") + spark.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt") } override def afterAll(): Unit = { - sqlContext.sql(s"DROP TABLE IF EXISTS jt") + spark.sql(s"DROP TABLE IF EXISTS jt") } test("nested views (interleaved with temporary views)") { @@ -277,11 +277,11 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf( SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") { withTable("add_col") { - sqlContext.range(10).write.saveAsTable("add_col") + spark.range(10).write.saveAsTable("add_col") withView("v") { sql("CREATE VIEW v AS SELECT * FROM add_col") - sqlContext.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") - checkAnswer(sql("SELECT * FROM v"), sqlContext.range(10).toDF()) + spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") + checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF()) } } } @@ -291,8 +291,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // make sure the new flag can handle some complex cases like join and schema change. withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { withTable("jt1", "jt2") { - sqlContext.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") - sqlContext.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") + spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") + spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala index d0e7552..cbbeacf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -353,7 +353,7 @@ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSi checkAnswer(actual, expected) - sqlContext.dropTempTable("nums") + spark.catalog.dropTempTable("nums") } test("SPARK-7595: Window will cause resolve failed with self join") { http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index b97da1f..965680f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -75,11 +75,11 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) checkAnswer( - sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), + spark.read.orc(path).where("not (a = 2) or not(b in ('1'))"), (1 to 5).map(i => Row(i, (i % 2).toString))) checkAnswer( - sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), + spark.read.orc(path).where("not (a = 2 and b in ('1'))"), (1 to 5).map(i => Row(i, (i % 2).toString))) } } @@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { .orc(path) // Check if this is compressed as ZLIB. - val conf = sqlContext.sessionState.newHadoopConf() + val conf = spark.sessionState.newHadoopConf() val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) @@ -102,7 +102,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)) assert(orcReader.getCompression == CompressionKind.ZLIB) - val copyDf = sqlContext + val copyDf = spark .read .orc(path) checkAnswer(df, copyDf) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index aa9c118..084546f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -66,7 +66,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - sqlContext.read.orc(file), + spark.read.orc(file), data.toDF().collect()) } } @@ -170,7 +170,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // Hive supports zlib, snappy and none for Hive 1.2.1. test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { withTempPath { file => - sqlContext.range(0, 10).write + spark.range(0, 10).write .option("orc.compress", "ZLIB") .orc(file.getCanonicalPath) val expectedCompressionKind = @@ -179,7 +179,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } withTempPath { file => - sqlContext.range(0, 10).write + spark.range(0, 10).write .option("orc.compress", "SNAPPY") .orc(file.getCanonicalPath) val expectedCompressionKind = @@ -188,7 +188,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } withTempPath { file => - sqlContext.range(0, 10).write + spark.range(0, 10).write .option("orc.compress", "NONE") .orc(file.getCanonicalPath) val expectedCompressionKind = @@ -200,7 +200,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // Following codec is not supported in Hive 1.2.1, ignore it now ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") { withTempPath { file => - sqlContext.range(0, 10).write + spark.range(0, 10).write .option("orc.compress", "LZO") .orc(file.getCanonicalPath) val expectedCompressionKind = @@ -301,12 +301,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { dir => val path = dir.getCanonicalPath - sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path) - sqlContext.read.format("orc").load(path).schema("Acol") + spark.range(0, 10).select('id as "Acol").write.format("orc").save(path) + spark.read.format("orc").load(path).schema("Acol") intercept[IllegalArgumentException] { - sqlContext.read.format("orc").load(path).schema("acol") + spark.read.format("orc").load(path).schema("acol") } - checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"), + checkAnswer(spark.read.format("orc").load(path).select("acol").sort("acol"), (0 until 10).map(Row(_))) } } @@ -317,7 +317,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTable("empty_orc") { withTempTable("empty", "single") { - sqlContext.sql( + spark.sql( s"""CREATE TABLE empty_orc(key INT, value STRING) |STORED AS ORC |LOCATION '$path' @@ -328,13 +328,13 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because // Spark SQL ORC data source always avoids write empty ORC files. - sqlContext.sql( + spark.sql( s"""INSERT INTO TABLE empty_orc |SELECT key, value FROM empty """.stripMargin) val errorMessage = intercept[AnalysisException] { - sqlContext.read.orc(path) + spark.read.orc(path) }.getMessage assert(errorMessage.contains("Unable to infer schema for ORC")) @@ -342,12 +342,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) singleRowDF.registerTempTable("single") - sqlContext.sql( + spark.sql( s"""INSERT INTO TABLE empty_orc |SELECT key, value FROM single """.stripMargin) - val df = sqlContext.read.orc(path) + val df = spark.read.orc(path) assert(df.schema === singleRowDF.schema.asNullable) checkAnswer(df, singleRowDF) } @@ -373,7 +373,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { // It needs to repartition data so that we can have several ORC files // in order to skip stripes in ORC. createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) - val df = sqlContext.read.orc(path) + val df = spark.read.orc(path) def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { val sourceDf = stripSparkFilter(df.where(pred)) @@ -415,7 +415,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTable("dummy_orc") { withTempTable("single") { - sqlContext.sql( + spark.sql( s"""CREATE TABLE dummy_orc(key INT, value STRING) |STORED AS ORC |LOCATION '$path' @@ -424,12 +424,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) singleRowDF.registerTempTable("single") - sqlContext.sql( + spark.sql( s"""INSERT INTO TABLE dummy_orc |SELECT key, value FROM single """.stripMargin) - val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0") + val df = spark.sql("SELECT * FROM dummy_orc WHERE key=0") checkAnswer(df, singleRowDF) val queryExecution = df.queryExecution @@ -448,7 +448,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { val data = (0 until 10).map(i => Tuple1(Array(i))) withOrcFile(data) { file => - val actual = sqlContext + val actual = spark .read .orc(file) .where("_1 is not null") http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala index 637c106..aba60da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala @@ -49,7 +49,7 @@ private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton { protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag] (data: Seq[T]) (f: DataFrame => Unit): Unit = { - withOrcFile(data)(path => f(sqlContext.read.orc(path))) + withOrcFile(data)(path => f(spark.read.orc(path))) } /** @@ -61,7 +61,7 @@ private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton { (data: Seq[T], tableName: String) (f: => Unit): Unit = { withOrcDataFrame(data) { df => - sqlContext.registerDataFrameAsTable(df, tableName) + spark.wrapped.registerDataFrameAsTable(df, tableName) withTempTable(tableName)(f) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 1c1f6d9..6e93bbd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -634,7 +634,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { """.stripMargin) checkAnswer( - sqlContext.read.parquet(path), + spark.read.parquet(path), Row("1st", "2nd", Seq(Row("val_a", "val_b")))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index a3e7737..8bf6f22 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -105,7 +105,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle } // Read the bucket file into a dataframe, so that it's easier to test. - val readBack = sqlContext.read.format(source) + val readBack = spark.read.format(source) .load(bucketFile.getAbsolutePath) .select(columns: _*) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala index 08e83b7..f9387fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -34,7 +34,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton // Here we coalesce partition number to 1 to ensure that only a single task is issued. This // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` // directory while committing/aborting the job. See SPARK-8513 for more details. - val df = sqlContext.range(0, 10).coalesce(1) + val df = spark.range(0, 10).coalesce(1) intercept[SparkException] { df.write.format(dataSourceName).save(file.getCanonicalPath) } @@ -49,7 +49,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton withTempPath { file => // fail the job in the middle of writing val divideByZero = udf((x: Int) => { x / (x - 1)}) - val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id"))) + val df = spark.range(0, 10).coalesce(1).select(divideByZero(col("id"))) SimpleTextRelation.callbackCalled = false intercept[SparkException] { @@ -66,7 +66,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton SimpleTextRelation.failCommitter = false withTempPath { file => // fail the job in the middle of writing - val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) + val df = spark.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) SimpleTextRelation.callbackCalled = false SimpleTextRelation.failWriter = true http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 20c5f72..f4d6333 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with TestHiveSingleton { - import sqlContext.implicits._ + import spark.implicits._ val dataSourceName: String @@ -143,8 +143,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .add("index", IntegerType, nullable = false) .add("col", dataType, nullable = true) val rdd = - sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) - val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + spark.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator()))) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) df.write .mode("overwrite") @@ -153,7 +153,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .options(extraOptions) .save(path) - val loadedDF = sqlContext + val loadedDF = spark .read .format(dataSourceName) .option("dataSchema", df.schema.json) @@ -174,7 +174,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) checkAnswer( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("path", file.getCanonicalPath) .option("dataSchema", dataSchema.json) .load(), @@ -188,7 +188,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath) checkAnswer( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) .load(file.getCanonicalPath).orderBy("a"), testDF.union(testDF).orderBy("a").collect()) @@ -208,7 +208,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath) val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(sqlContext.sessionState.newHadoopConf()) + val fs = path.getFileSystem(spark.sessionState.newHadoopConf()) assert(fs.listStatus(path).isEmpty) } } @@ -222,7 +222,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(file.getCanonicalPath) checkQueries( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) .load(file.getCanonicalPath)) } @@ -243,7 +243,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(file.getCanonicalPath) checkAnswer( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) .load(file.getCanonicalPath), partitionedTestDF.collect()) @@ -265,7 +265,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(file.getCanonicalPath) checkAnswer( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) .load(file.getCanonicalPath), partitionedTestDF.union(partitionedTestDF).collect()) @@ -287,7 +287,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(file.getCanonicalPath) checkAnswer( - sqlContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchema.json) .load(file.getCanonicalPath), partitionedTestDF.collect()) @@ -323,7 +323,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t"), testDF.collect()) + checkAnswer(spark.table("t"), testDF.collect()) } } @@ -332,7 +332,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t"), testDF.union(testDF).orderBy("a").collect()) + checkAnswer(spark.table("t"), testDF.union(testDF).orderBy("a").collect()) } } @@ -351,7 +351,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes withTempTable("t") { testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") - assert(sqlContext.table("t").collect().isEmpty) + assert(spark.table("t").collect().isEmpty) } } @@ -362,18 +362,18 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkQueries(sqlContext.table("t")) + checkQueries(spark.table("t")) } } test("saveAsTable()/load() - partitioned table - boolean type") { - sqlContext.range(2) + spark.range(2) .select('id, ('id % 2 === 0).as("b")) .write.partitionBy("b").saveAsTable("t") withTable("t") { checkAnswer( - sqlContext.table("t").sort('id), + spark.table("t").sort('id), Row(0, true) :: Row(1, false) :: Nil ) } @@ -395,7 +395,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) + checkAnswer(spark.table("t"), partitionedTestDF.collect()) } } @@ -415,7 +415,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.union(partitionedTestDF).collect()) + checkAnswer(spark.table("t"), partitionedTestDF.union(partitionedTestDF).collect()) } } @@ -435,7 +435,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t"), partitionedTestDF.collect()) + checkAnswer(spark.table("t"), partitionedTestDF.collect()) } } @@ -484,7 +484,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .partitionBy("p1", "p2") .saveAsTable("t") - assert(sqlContext.table("t").collect().isEmpty) + assert(spark.table("t").collect().isEmpty) } } @@ -516,7 +516,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // Inferring schema should throw error as it should not find any file to infer val e = intercept[Exception] { - sqlContext.read.format(dataSourceName).load(dir.getCanonicalPath) + spark.read.format(dataSourceName).load(dir.getCanonicalPath) } e match { @@ -533,7 +533,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes /** Test whether data is read with the given path matches the expected answer */ def testWithPath(path: File, expectedAnswer: Seq[Row]): Unit = { - val df = sqlContext.read + val df = spark.read .format(dataSourceName) .schema(dataInDir.schema) // avoid schema inference for any format .load(path.getCanonicalPath) @@ -618,7 +618,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes /** Check whether data is read with the given path matches the expected answer */ def check(path: String, expectedDf: DataFrame): Unit = { - val df = sqlContext.read + val df = spark.read .format(dataSourceName) .schema(schema) // avoid schema inference for any format, expected to be same format .load(path) @@ -654,7 +654,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes basePath: Option[String] = None ): Unit = { try { - val reader = sqlContext.read + val reader = spark.read basePath.foreach(reader.option("basePath", _)) val testDf = reader .format(dataSourceName) @@ -739,7 +739,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val realData = input.collect() - checkAnswer(sqlContext.table("t"), realData ++ realData) + checkAnswer(spark.table("t"), realData ++ realData) } } } @@ -754,7 +754,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .saveAsTable("t") withTable("t") { - checkAnswer(sqlContext.table("t").select('b, 'c, 'a), df.select('b, 'c, 'a).collect()) + checkAnswer(spark.table("t").select('b, 'c, 'a), df.select('b, 'c, 'a).collect()) } } @@ -766,7 +766,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes test("SPARK-8406: Avoids name collision while writing files") { withTempPath { dir => val path = dir.getCanonicalPath - sqlContext + spark .range(10000) .repartition(250) .write @@ -775,7 +775,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes .save(path) assertResult(10000) { - sqlContext + spark .read .format(dataSourceName) .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json) @@ -794,7 +794,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes classOf[AlwaysFailParquetOutputCommitter].getName ) - val df = sqlContext.range(1, 10).toDF("i") + val df = spark.range(1, 10).toDF("i") withTempPath { dir => df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) // Because there data already exists, @@ -802,7 +802,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes // with file format and AlwaysFailOutputCommitter will not be used. df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) checkAnswer( - sqlContext.read + spark.read .format(dataSourceName) .option("dataSchema", df.schema.json) .options(extraOptions) @@ -850,12 +850,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes ) withTempPath { dir => val path = "file://" + dir.getCanonicalPath - val df1 = sqlContext.range(4) + val df1 = spark.range(4) df1.coalesce(1).write.mode("overwrite").options(options).format(dataSourceName).save(path) df1.coalesce(1).write.mode("append").options(options).format(dataSourceName).save(path) def checkLocality(): Unit = { - val df2 = sqlContext.read + val df2 = spark.read .format(dataSourceName) .option("dataSchema", df1.schema.json) .options(options) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 1d10488..4b4852c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -126,18 +126,18 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { test("SPARK-8604: Parquet data source should write summary file while doing appending") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(0, 5).toDF() + val df = spark.range(0, 5).toDF() df.write.mode(SaveMode.Overwrite).parquet(path) val summaryPath = new Path(path, "_metadata") val commonSummaryPath = new Path(path, "_common_metadata") - val fs = summaryPath.getFileSystem(sqlContext.sessionState.newHadoopConf()) + val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf()) fs.delete(summaryPath, true) fs.delete(commonSummaryPath, true) df.write.mode(SaveMode.Append).parquet(path) - checkAnswer(sqlContext.read.parquet(path), df.union(df)) + checkAnswer(spark.read.parquet(path), df.union(df)) assert(fs.exists(summaryPath)) assert(fs.exists(commonSummaryPath)) @@ -148,8 +148,8 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { withTempPath { dir => val path = dir.getCanonicalPath - sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path) - val df = sqlContext.read.parquet(path).filter('a === 0).select('b) + spark.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path) + val df = spark.read.parquet(path).filter('a === 0).select('b) val physicalPlan = df.queryExecution.sparkPlan assert(physicalPlan.collect { case p: execution.ProjectExec => p }.length === 1) @@ -170,7 +170,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // The schema consists of the leading columns of the first part-file // in the lexicographic order. - assert(sqlContext.read.parquet(dir.getCanonicalPath).schema.map(_.name) + assert(spark.read.parquet(dir.getCanonicalPath).schema.map(_.name) === Seq("a", "b", "c", "d", "part")) } } @@ -188,8 +188,8 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { Row(5, 127.toByte), Row(6, -44.toByte), Row(7, 23.toByte), Row(8, -95.toByte), Row(9, 127.toByte), Row(10, 13.toByte)) - val rdd = sqlContext.sparkContext.parallelize(data) - val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1) + val rdd = spark.sparkContext.parallelize(data) + val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1) df.write .mode("overwrite") @@ -197,7 +197,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { .option("dataSchema", df.schema.json) .save(path) - val loadedDF = sqlContext + val loadedDF = spark .read .format(dataSourceName) .option("dataSchema", df.schema.json) @@ -221,7 +221,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { val compressedFiles = new File(path).listFiles() assert(compressedFiles.exists(_.getName.endsWith(".gz.parquet"))) - val copyDf = sqlContext + val copyDf = spark .read .parquet(path) checkAnswer(df, copyDf) http://git-wip-us.apache.org/repos/asf/spark/blob/ed0b4070/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 9ad0887..fa64c7d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -69,7 +69,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat test("test hadoop conf option propagation") { withTempPath { file => // Test write side - val df = sqlContext.range(10).selectExpr("cast(id as string)") + val df = spark.range(10).selectExpr("cast(id as string)") df.write .option("some-random-write-option", "hahah-WRITE") .option("some-null-value-option", null) // test null robustness @@ -78,7 +78,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat assert(SimpleTextRelation.lastHadoopConf.get.get("some-random-write-option") == "hahah-WRITE") // Test read side - val df1 = sqlContext.read + val df1 = spark.read .option("some-random-read-option", "hahah-READ") .option("some-null-value-option", null) // test null robustness .option("dataSchema", df.schema.json) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org