http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala index 1742df3..c31dffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala @@ -27,16 +27,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowComments off") { val str = """{'name': /* hello */ 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.json(rdd) assert(df.schema.head.name == "_corrupt_record") } test("allowComments on") { val str = """{'name': /* hello */ 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowComments", "true").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowComments", "true").json(rdd) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -44,16 +44,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowSingleQuotes off") { val str = """{'name': 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowSingleQuotes", "false").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowSingleQuotes", "false").json(rdd) assert(df.schema.head.name == "_corrupt_record") } test("allowSingleQuotes on") { val str = """{'name': 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.json(rdd) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -61,16 +61,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowUnquotedFieldNames off") { val str = """{name: 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.json(rdd) assert(df.schema.head.name == "_corrupt_record") } test("allowUnquotedFieldNames on") { val str = """{name: 'Reynold Xin'}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowUnquotedFieldNames", "true").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowUnquotedFieldNames", "true").json(rdd) assert(df.schema.head.name == "name") assert(df.first().getString(0) == "Reynold Xin") @@ -78,16 +78,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowNumericLeadingZeros off") { val str = """{"age": 0018}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.json(rdd) assert(df.schema.head.name == "_corrupt_record") } test("allowNumericLeadingZeros on") { val str = """{"age": 0018}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowNumericLeadingZeros", "true").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowNumericLeadingZeros", "true").json(rdd) assert(df.schema.head.name == "age") assert(df.first().getLong(0) == 18) @@ -97,16 +97,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS. ignore("allowNonNumericNumbers off") { val str = """{"age": NaN}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.json(rdd) assert(df.schema.head.name == "_corrupt_record") } ignore("allowNonNumericNumbers on") { val str = """{"age": NaN}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowNonNumericNumbers", "true").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowNonNumericNumbers", "true").json(rdd) assert(df.schema.head.name == "age") assert(df.first().getDouble(0).isNaN) @@ -114,16 +114,16 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext { test("allowBackslashEscapingAnyCharacter off") { val str = """{"name": "Cazen Lee", "price": "\$10"}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd) assert(df.schema.head.name == "_corrupt_record") } test("allowBackslashEscapingAnyCharacter on") { val str = """{"name": "Cazen Lee", "price": "\$10"}""" - val rdd = sqlContext.sparkContext.parallelize(Seq(str)) - val df = sqlContext.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd) + val rdd = spark.sparkContext.parallelize(Seq(str)) + val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd) assert(df.schema.head.name == "name") assert(df.schema.last.name == "price")
http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b1279ab..63fe465 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -229,7 +229,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Complex field and type inferring with null in sampling") { - val jsonDF = sqlContext.read.json(jsonNullStruct) + val jsonDF = spark.read.json(jsonNullStruct) val expectedSchema = StructType( StructField("headers", StructType( StructField("Charset", StringType, true) :: @@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Primitive field and type inferring") { - val jsonDF = sqlContext.read.json(primitiveFieldAndType) + val jsonDF = spark.read.json(primitiveFieldAndType) val expectedSchema = StructType( StructField("bigInteger", DecimalType(20, 0), true) :: @@ -276,7 +276,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Complex field and type inferring") { - val jsonDF = sqlContext.read.json(complexFieldAndType1) + val jsonDF = spark.read.json(complexFieldAndType1) val expectedSchema = StructType( StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: @@ -375,7 +375,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("GetField operation on complex data type") { - val jsonDF = sqlContext.read.json(complexFieldAndType1) + val jsonDF = spark.read.json(complexFieldAndType1) jsonDF.registerTempTable("jsonTable") checkAnswer( @@ -391,7 +391,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Type conflict in primitive field values") { - val jsonDF = sqlContext.read.json(primitiveFieldValueTypeConflict) + val jsonDF = spark.read.json(primitiveFieldValueTypeConflict) val expectedSchema = StructType( StructField("num_bool", StringType, true) :: @@ -463,7 +463,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } ignore("Type conflict in primitive field values (Ignored)") { - val jsonDF = sqlContext.read.json(primitiveFieldValueTypeConflict) + val jsonDF = spark.read.json(primitiveFieldValueTypeConflict) jsonDF.registerTempTable("jsonTable") // Right now, the analyzer does not promote strings in a boolean expression. @@ -516,7 +516,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Type conflict in complex field values") { - val jsonDF = sqlContext.read.json(complexFieldValueTypeConflict) + val jsonDF = spark.read.json(complexFieldValueTypeConflict) val expectedSchema = StructType( StructField("array", ArrayType(LongType, true), true) :: @@ -540,7 +540,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Type conflict in array elements") { - val jsonDF = sqlContext.read.json(arrayElementTypeConflict) + val jsonDF = spark.read.json(arrayElementTypeConflict) val expectedSchema = StructType( StructField("array1", ArrayType(StringType, true), true) :: @@ -568,7 +568,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Handling missing fields") { - val jsonDF = sqlContext.read.json(missingFields) + val jsonDF = spark.read.json(missingFields) val expectedSchema = StructType( StructField("a", BooleanType, true) :: @@ -588,7 +588,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { dir.delete() val path = dir.getCanonicalPath primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonDF = sqlContext.read.json(path) + val jsonDF = spark.read.json(path) val expectedSchema = StructType( StructField("bigInteger", DecimalType(20, 0), true) :: @@ -620,7 +620,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { dir.delete() val path = dir.getCanonicalPath primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonDF = sqlContext.read.option("primitivesAsString", "true").json(path) + val jsonDF = spark.read.option("primitivesAsString", "true").json(path) val expectedSchema = StructType( StructField("bigInteger", StringType, true) :: @@ -648,7 +648,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Loading a JSON dataset primitivesAsString returns complex fields as strings") { - val jsonDF = sqlContext.read.option("primitivesAsString", "true").json(complexFieldAndType1) + val jsonDF = spark.read.option("primitivesAsString", "true").json(complexFieldAndType1) val expectedSchema = StructType( StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: @@ -746,7 +746,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Loading a JSON dataset prefersDecimal returns schema with float types as BigDecimal") { - val jsonDF = sqlContext.read.option("prefersDecimal", "true").json(primitiveFieldAndType) + val jsonDF = spark.read.option("prefersDecimal", "true").json(primitiveFieldAndType) val expectedSchema = StructType( StructField("bigInteger", DecimalType(20, 0), true) :: @@ -777,7 +777,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val mixedIntegerAndDoubleRecords = sparkContext.parallelize( """{"a": 3, "b": 1.1}""" :: s"""{"a": 3.1, "b": 0.${"0" * 38}1}""" :: Nil) - val jsonDF = sqlContext.read + val jsonDF = spark.read .option("prefersDecimal", "true") .json(mixedIntegerAndDoubleRecords) @@ -796,7 +796,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Infer big integers correctly even when it does not fit in decimal") { - val jsonDF = sqlContext.read + val jsonDF = spark.read .json(bigIntegerRecords) // The value in `a` field will be a double as it does not fit in decimal. For `b` field, @@ -810,7 +810,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Infer floating-point values correctly even when it does not fit in decimal") { - val jsonDF = sqlContext.read + val jsonDF = spark.read .option("prefersDecimal", "true") .json(floatingValueRecords) @@ -823,7 +823,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(expectedSchema === jsonDF.schema) checkAnswer(jsonDF, Row(1.0E-39D, BigDecimal(0.01))) - val mergedJsonDF = sqlContext.read + val mergedJsonDF = spark.read .option("prefersDecimal", "true") .json(floatingValueRecords ++ bigIntegerRecords) @@ -881,7 +881,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { StructField("null", StringType, true) :: StructField("string", StringType, true) :: Nil) - val jsonDF1 = sqlContext.read.schema(schema).json(path) + val jsonDF1 = spark.read.schema(schema).json(path) assert(schema === jsonDF1.schema) @@ -898,7 +898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { "this is a simple string.") ) - val jsonDF2 = sqlContext.read.schema(schema).json(primitiveFieldAndType) + val jsonDF2 = spark.read.schema(schema).json(primitiveFieldAndType) assert(schema === jsonDF2.schema) @@ -919,7 +919,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("Applying schemas with MapType") { val schemaWithSimpleMap = StructType( StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) - val jsonWithSimpleMap = sqlContext.read.schema(schemaWithSimpleMap).json(mapType1) + val jsonWithSimpleMap = spark.read.schema(schemaWithSimpleMap).json(mapType1) jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap") @@ -947,7 +947,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val schemaWithComplexMap = StructType( StructField("map", MapType(StringType, innerStruct, true), false) :: Nil) - val jsonWithComplexMap = sqlContext.read.schema(schemaWithComplexMap).json(mapType2) + val jsonWithComplexMap = spark.read.schema(schemaWithComplexMap).json(mapType2) jsonWithComplexMap.registerTempTable("jsonWithComplexMap") @@ -973,7 +973,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-2096 Correctly parse dot notations") { - val jsonDF = sqlContext.read.json(complexFieldAndType2) + val jsonDF = spark.read.json(complexFieldAndType2) jsonDF.registerTempTable("jsonTable") checkAnswer( @@ -991,7 +991,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-3390 Complex arrays") { - val jsonDF = sqlContext.read.json(complexFieldAndType2) + val jsonDF = spark.read.json(complexFieldAndType2) jsonDF.registerTempTable("jsonTable") checkAnswer( @@ -1014,7 +1014,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-3308 Read top level JSON arrays") { - val jsonDF = sqlContext.read.json(jsonArray) + val jsonDF = spark.read.json(jsonArray) jsonDF.registerTempTable("jsonTable") checkAnswer( @@ -1035,7 +1035,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { StructField("a", StringType, true) :: Nil) // `FAILFAST` mode should throw an exception for corrupt records. val exceptionOne = intercept[SparkException] { - sqlContext.read + spark.read .option("mode", "FAILFAST") .json(corruptRecords) .collect() @@ -1043,7 +1043,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {")) val exceptionTwo = intercept[SparkException] { - sqlContext.read + spark.read .option("mode", "FAILFAST") .schema(schema) .json(corruptRecords) @@ -1060,7 +1060,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val schemaTwo = StructType( StructField("a", StringType, true) :: Nil) // `DROPMALFORMED` mode should skip corrupt records - val jsonDFOne = sqlContext.read + val jsonDFOne = spark.read .option("mode", "DROPMALFORMED") .json(corruptRecords) checkAnswer( @@ -1069,7 +1069,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) assert(jsonDFOne.schema === schemaOne) - val jsonDFTwo = sqlContext.read + val jsonDFTwo = spark.read .option("mode", "DROPMALFORMED") .schema(schemaTwo) .json(corruptRecords) @@ -1083,7 +1083,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") { - val jsonDF = sqlContext.read.json(corruptRecords) + val jsonDF = spark.read.json(corruptRecords) jsonDF.registerTempTable("jsonTable") val schema = StructType( StructField("_unparsed", StringType, true) :: @@ -1134,7 +1134,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-13953 Rename the corrupt record field via option") { - val jsonDF = sqlContext.read + val jsonDF = spark.read .option("columnNameOfCorruptRecord", "_malformed") .json(corruptRecords) val schema = StructType( @@ -1155,7 +1155,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("SPARK-4068: nulls in arrays") { - val jsonDF = sqlContext.read.json(nullsInArrays) + val jsonDF = spark.read.json(nullsInArrays) jsonDF.registerTempTable("jsonTable") val schema = StructType( @@ -1201,7 +1201,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5) } - val df1 = sqlContext.createDataFrame(rowRDD1, schema1) + val df1 = spark.createDataFrame(rowRDD1, schema1) df1.registerTempTable("applySchema1") val df2 = df1.toDF val result = df2.toJSON.collect() @@ -1224,7 +1224,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4)) } - val df3 = sqlContext.createDataFrame(rowRDD2, schema2) + val df3 = spark.createDataFrame(rowRDD2, schema2) df3.registerTempTable("applySchema2") val df4 = df3.toDF val result2 = df4.toJSON.collect() @@ -1232,8 +1232,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}") assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}") - val jsonDF = sqlContext.read.json(primitiveFieldAndType) - val primTable = sqlContext.read.json(jsonDF.toJSON.rdd) + val jsonDF = spark.read.json(primitiveFieldAndType) + val primTable = spark.read.json(jsonDF.toJSON.rdd) primTable.registerTempTable("primitiveTable") checkAnswer( sql("select * from primitiveTable"), @@ -1245,8 +1245,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { "this is a simple string.") ) - val complexJsonDF = sqlContext.read.json(complexFieldAndType1) - val compTable = sqlContext.read.json(complexJsonDF.toJSON.rdd) + val complexJsonDF = spark.read.json(complexFieldAndType1) + val compTable = spark.read.json(complexJsonDF.toJSON.rdd) compTable.registerTempTable("complexTable") // Access elements of a primitive array. checkAnswer( @@ -1316,7 +1316,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) val d1 = DataSource( - sqlContext.sparkSession, + spark, userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, @@ -1324,7 +1324,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { options = Map("path" -> path)).resolveRelation() val d2 = DataSource( - sqlContext.sparkSession, + spark, userSpecifiedSchema = None, partitionColumns = Array.empty[String], bucketSpec = None, @@ -1345,16 +1345,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { withTempDir { dir => val schemaWithSimpleMap = StructType( StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) - val df = sqlContext.read.schema(schemaWithSimpleMap).json(mapType1) + val df = spark.read.schema(schemaWithSimpleMap).json(mapType1) val path = dir.getAbsolutePath df.write.mode("overwrite").parquet(path) // order of MapType is not defined - assert(sqlContext.read.parquet(path).count() == 5) + assert(spark.read.parquet(path).count() == 5) - val df2 = sqlContext.read.json(corruptRecords) + val df2 = spark.read.json(corruptRecords) df2.write.mode("overwrite").parquet(path) - checkAnswer(sqlContext.read.parquet(path), df2.collect()) + checkAnswer(spark.read.parquet(path), df2.collect()) } } } @@ -1387,7 +1387,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { "col1", "abd") - sqlContext.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part") + spark.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part") checkAnswer(sql( "SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abc'"), Row(4)) checkAnswer(sql( @@ -1447,7 +1447,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(4.75.toFloat, Seq(false, true)), new UDT.MyDenseVector(Array(0.25, 2.25, 4.25))) val data = - Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil + Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil // Data generated by previous versions. // scalastyle:off @@ -1462,7 +1462,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // scalastyle:on // Generate data for the current version. - val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) withTempPath { path => df.write.format("json").mode("overwrite").save(path.getCanonicalPath) @@ -1486,13 +1486,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { "Spark 1.4.1", "Spark 1.5.0", "Spark 1.5.0", - "Spark " + sqlContext.sparkContext.version, - "Spark " + sqlContext.sparkContext.version) + "Spark " + spark.sparkContext.version, + "Spark " + spark.sparkContext.version) val expectedResult = col0Values.map { v => Row.fromSeq(Seq(v) ++ constantValues) } checkAnswer( - sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath), + spark.read.format("json").schema(schema).load(path.getCanonicalPath), expectedResult ) } @@ -1502,16 +1502,16 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(2) + val df = spark.range(2) df.write.json(path + "/p=1") df.write.json(path + "/p=2") - assert(sqlContext.read.json(path).count() === 4) + assert(spark.read.json(path).count() === 4) val extraOptions = Map( "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName ) - assert(sqlContext.read.options(extraOptions).json(path).count() === 2) + assert(spark.read.options(extraOptions).json(path).count() === 2) } } @@ -1525,12 +1525,12 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { { // We need to make sure we can infer the schema. - val jsonDF = sqlContext.read.json(additionalCorruptRecords) + val jsonDF = spark.read.json(additionalCorruptRecords) assert(jsonDF.schema === schema) } { - val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords) + val jsonDF = spark.read.schema(schema).json(additionalCorruptRecords) jsonDF.registerTempTable("jsonTable") // In HiveContext, backticks should be used to access columns starting with a underscore. @@ -1563,7 +1563,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { StructField("a", StructType( StructField("b", StringType) :: Nil )) :: Nil) - val jsonDF = sqlContext.read.schema(schema).json(path) + val jsonDF = spark.read.schema(schema).json(path) assert(jsonDF.count() == 2) } } @@ -1575,7 +1575,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonDF = sqlContext.read.json(path) + val jsonDF = spark.read.json(path) val jsonDir = new File(dir, "json").getCanonicalPath jsonDF.coalesce(1).write .format("json") @@ -1585,7 +1585,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val compressedFiles = new File(jsonDir).listFiles() assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) - val jsonCopy = sqlContext.read + val jsonCopy = spark.read .format("json") .load(jsonDir) @@ -1611,7 +1611,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val path = dir.getCanonicalPath primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonDF = sqlContext.read.json(path) + val jsonDF = spark.read.json(path) val jsonDir = new File(dir, "json").getCanonicalPath jsonDF.coalesce(1).write .format("json") @@ -1622,7 +1622,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val compressedFiles = new File(jsonDir).listFiles() assert(compressedFiles.exists(!_.getName.endsWith(".json.gz"))) - val jsonCopy = sqlContext.read + val jsonCopy = spark.read .format("json") .options(extraOptions) .load(jsonDir) @@ -1637,7 +1637,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("Casting long as timestamp") { withTempTable("jsonTable") { val schema = (new StructType).add("ts", TimestampType) - val jsonDF = sqlContext.read.schema(schema).json(timestampAsLong) + val jsonDF = spark.read.schema(schema).json(timestampAsLong) jsonDF.registerTempTable("jsonTable") @@ -1657,8 +1657,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val json = s""" |{"a": [{$nested}], "b": [{$nested}]} """.stripMargin - val rdd = sqlContext.sparkContext.makeRDD(Seq(json)) - val df = sqlContext.read.json(rdd) + val rdd = spark.sparkContext.makeRDD(Seq(json)) + val df = spark.read.json(rdd) assert(df.schema.size === 2) df.collect() } http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 2873c6a..f4a3336 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession private[json] trait TestJsonData { - protected def sqlContext: SQLContext + protected def spark: SparkSession def primitiveFieldAndType: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"string":"this is a simple string.", "integer":10, "long":21474836470, @@ -35,7 +35,7 @@ private[json] trait TestJsonData { }""" :: Nil) def primitiveFieldValueTypeConflict: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" :: """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null, @@ -46,14 +46,14 @@ private[json] trait TestJsonData { "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil) def jsonNullStruct: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" :: """{"nullstr":"","ip":"27.31.100.29","headers":""}""" :: """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil) def complexFieldValueTypeConflict: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"num_struct":11, "str_array":[1, 2, 3], "array":[], "struct_array":[], "struct": {}}""" :: """{"num_struct":{"field":false}, "str_array":null, @@ -64,14 +64,14 @@ private[json] trait TestJsonData { "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil) def arrayElementTypeConflict: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}], "array2": [{"field":214748364700}, {"field":1}]}""" :: """{"array3": [{"field":"str"}, {"field":1}]}""" :: """{"array3": [1, 2, 3]}""" :: Nil) def missingFields: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"a":true}""" :: """{"b":21474836470}""" :: """{"c":[33, 44]}""" :: @@ -79,7 +79,7 @@ private[json] trait TestJsonData { """{"e":"str"}""" :: Nil) def complexFieldAndType1: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], @@ -95,7 +95,7 @@ private[json] trait TestJsonData { }""" :: Nil) def complexFieldAndType2: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "complexArrayOfStruct": [ { @@ -149,7 +149,7 @@ private[json] trait TestJsonData { }""" :: Nil) def mapType1: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"map": {"a": 1}}""" :: """{"map": {"b": 2}}""" :: """{"map": {"c": 3}}""" :: @@ -157,7 +157,7 @@ private[json] trait TestJsonData { """{"map": {"e": null}}""" :: Nil) def mapType2: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" :: """{"map": {"b": {"field2": 2}}}""" :: """{"map": {"c": {"field1": [], "field2": 4}}}""" :: @@ -166,21 +166,21 @@ private[json] trait TestJsonData { """{"map": {"f": {"field1": null}}}""" :: Nil) def nullsInArrays: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"field1":[[null], [[["Test"]]]]}""" :: """{"field2":[null, [{"Test":1}]]}""" :: """{"field3":[[null], [{"Test":"2"}]]}""" :: """{"field4":[[null, [1,2,3]]]}""" :: Nil) def jsonArray: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """[{"a":"str_a_1"}]""" :: """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: """[]""" :: Nil) def corruptRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{""" :: """""" :: """{"a":1, b:2}""" :: @@ -189,7 +189,7 @@ private[json] trait TestJsonData { """]""" :: Nil) def additionalCorruptRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"dummy":"test"}""" :: """[1,2,3]""" :: """":"test", "a":1}""" :: @@ -197,7 +197,7 @@ private[json] trait TestJsonData { """ ","ian":"test"}""" :: Nil) def emptyRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{""" :: """""" :: """{"a": {}}""" :: @@ -206,23 +206,23 @@ private[json] trait TestJsonData { """]""" :: Nil) def timestampAsLong: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"ts":1451732645}""" :: Nil) def arrayAndStructRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( """{"a": {"b": 1}}""" :: """{"a": []}""" :: Nil) def floatingValueRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil) def bigIntegerRecords: RDD[String] = - sqlContext.sparkContext.parallelize( + spark.sparkContext.parallelize( s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil) - lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) + lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil) - def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) + def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]()) } http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index f98ea8c..6509e04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -67,7 +67,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row( i % 2 == 0, i, @@ -114,7 +114,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => if (i % 3 == 0) { Row.apply(Seq.fill(7)(null): _*) } else { @@ -155,7 +155,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row( Seq.tabulate(3)(i => s"val_$i"), if (i % 3 == 0) null else Seq.tabulate(3)(identity)) @@ -182,7 +182,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row(Seq.tabulate(3, 3)((i, j) => i * 3 + j)) }) } @@ -205,7 +205,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row(Seq.tabulate(3)(i => i.toString -> Seq.tabulate(3)(j => i + j)).toMap) }) } @@ -221,7 +221,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared logParquetSchema(path) - checkAnswer(sqlContext.read.parquet(path), (0 until 10).map { i => + checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row( Seq.tabulate(3)(n => s"arr_${i + n}"), Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap, @@ -267,7 +267,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared } } - checkAnswer(sqlContext.read.parquet(path).filter('suit === "SPADES"), Row("SPADES")) + checkAnswer(spark.read.parquet(path).filter('suit === "SPADES"), Row("SPADES")) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index 45cc681..57cd70e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -38,7 +38,7 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq } protected def readParquetSchema(path: String, pathFilter: Path => Boolean): MessageType = { - val hadoopConf = sqlContext.sessionState.newHadoopConf() + val hadoopConf = spark.sessionState.newHadoopConf() val fsPath = new Path(path) val fs = fsPath.getFileSystem(hadoopConf) val parquetFiles = fs.listStatus(fsPath, new PathFilter { http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 65635e3..45fd6a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -304,7 +304,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( - sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"), + spark.read.parquet(dir.getCanonicalPath).filter("part = 1"), (1 to 3).map(i => Row(i, i.toString, 1))) } } @@ -321,7 +321,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( - sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"), + spark.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"), (2 to 3).map(i => Row(i, i.toString, 1))) } } @@ -339,7 +339,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty, // this query will throw an exception since the project from combinedFilter expect // two projection while the - val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + val df1 = spark.read.parquet(dir.getCanonicalPath) assert(df1.filter("a > 1 or b < 2").count() == 2) } @@ -358,7 +358,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // test the generate new projection case // when projects != partitionAndNormalColumnProjs - val df1 = sqlContext.read.parquet(dir.getCanonicalPath) + val df1 = spark.read.parquet(dir.getCanonicalPath) checkAnswer( df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"), @@ -381,7 +381,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "c = 1" filter gets pushed down, this query will throw an exception which // Parquet emits. This is a Parquet issue (PARQUET-389). - val df = sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") + val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") checkAnswer( df, Row(1, "1", null)) @@ -394,7 +394,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex df.write.parquet(pathThree) // We will remove the temporary metadata when writing Parquet file. - val schema = sqlContext.read.parquet(pathThree).schema + val schema = spark.read.parquet(pathThree).schema assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) val pathFour = s"${dir.getCanonicalPath}/table4" @@ -407,7 +407,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "s.c = 1" filter gets pushed down, this query will throw an exception which // Parquet emits. - val dfStruct3 = sqlContext.read.parquet(pathFour, pathFive).filter("s.c = 1") + val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") .selectExpr("s") checkAnswer(dfStruct3, Row(Row(null, 1))) @@ -420,7 +420,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex dfStruct3.write.parquet(pathSix) // We will remove the temporary metadata when writing Parquet file. - val forPathSix = sqlContext.read.parquet(pathSix).schema + val forPathSix = spark.read.parquet(pathSix).schema assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) // sanity test: make sure optional metadata field is not wrongly set. @@ -429,7 +429,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val pathEight = s"${dir.getCanonicalPath}/table8" (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) - val df2 = sqlContext.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") + val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") checkAnswer( df2, Row(1, "1")) @@ -449,7 +449,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) - val df = sqlContext.read.parquet(path).filter("a = 2") + val df = spark.read.parquet(path).filter("a = 2") // The result should be single row. // When a filter is pushed to Parquet, Parquet can apply it to every row. @@ -470,11 +470,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path) checkAnswer( - sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"), + spark.read.parquet(path).where("not (a = 2) or not(b in ('1'))"), (1 to 5).map(i => Row(i, (i % 2).toString))) checkAnswer( - sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"), + spark.read.parquet(path).where("not (a = 2 and b in ('1'))"), (1 to 5).map(i => Row(i, (i % 2).toString))) } } @@ -527,19 +527,19 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // When a filter is pushed to Parquet, Parquet can apply it to every row. // So, we can check the number of rows returned from the Parquet // to make sure our filter pushdown work. - val df = sqlContext.read.parquet(path).where("b in (0,2)") + val df = spark.read.parquet(path).where("b in (0,2)") assert(stripSparkFilter(df).count == 3) - val df1 = sqlContext.read.parquet(path).where("not (b in (1))") + val df1 = spark.read.parquet(path).where("not (b in (1))") assert(stripSparkFilter(df1).count == 3) - val df2 = sqlContext.read.parquet(path).where("not (b in (1,3) or a <= 2)") + val df2 = spark.read.parquet(path).where("not (b in (1,3) or a <= 2)") assert(stripSparkFilter(df2).count == 2) - val df3 = sqlContext.read.parquet(path).where("not (b in (1,3) and a <= 2)") + val df3 = spark.read.parquet(path).where("not (b in (1,3) and a <= 2)") assert(stripSparkFilter(df3).count == 4) - val df4 = sqlContext.read.parquet(path).where("not (a <= 2)") + val df4 = spark.read.parquet(path).where("not (a <= 2)") assert(stripSparkFilter(df4).count == 3) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 32fe5ba..d0107aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sqlContext.sessionState.newHadoopConf() + val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) readParquetFile(path.toString)(df => { val sparkTypes = df.schema.map(_.dataType) @@ -132,7 +132,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { testStandardAndLegacyModes("fixed-length decimals") { def makeDecimalRDD(decimal: DecimalType): DataFrame = { - sqlContext + spark .range(1000) // Parquet doesn't allow column names with spaces, have to add an alias here. // Minus 500 here so that negative decimals are also tested. @@ -250,10 +250,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sqlContext.sessionState.newHadoopConf() + val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { - sqlContext.read.parquet(path.toString).printSchema() + spark.read.parquet(path.toString).printSchema() }.toString assert(errorMessage.contains("Parquet type not supported")) } @@ -271,15 +271,15 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val path = new Path(location.getCanonicalPath) - val conf = sqlContext.sessionState.newHadoopConf() + val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf) - val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) + val sparkTypes = spark.read.parquet(path.toString).schema.map(_.dataType) assert(sparkTypes === expectedSparkTypes) } } test("compression codec") { - val hadoopConf = sqlContext.sessionState.newHadoopConf() + val hadoopConf = spark.sessionState.newHadoopConf() def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) @@ -296,7 +296,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { def checkCompressionCodec(codec: CompressionCodecName): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => - assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) { + assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase) { compressionCodecFor(path, codec.name()) } } @@ -304,7 +304,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } // Checks default compression codec - checkCompressionCodec(CompressionCodecName.fromConf(sqlContext.conf.parquetCompressionCodec)) + checkCompressionCodec( + CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) checkCompressionCodec(CompressionCodecName.GZIP) @@ -351,7 +352,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("write metadata") { - val hadoopConf = sqlContext.sessionState.newHadoopConf() + val hadoopConf = spark.sessionState.newHadoopConf() withTempPath { file => val path = new Path(file.toURI.toString) val fs = FileSystem.getLocal(hadoopConf) @@ -433,7 +434,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { location => val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) - val conf = sqlContext.sessionState.newHadoopConf() + val conf = spark.sessionState.newHadoopConf() writeMetadata(parquetSchema, path, conf, extraMetadata) readParquetFile(path.toString) { df => @@ -455,7 +456,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { ) withTempPath { dir => val message = intercept[SparkException] { - sqlContext.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) + spark.range(0, 1).write.options(extraOptions).parquet(dir.getCanonicalPath) }.getCause.getMessage assert(message === "Intentional exception for testing purposes") } @@ -465,10 +466,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// intercept[Throwable] { - sqlContext.read.parquet("file:///nonexistent") + spark.read.parquet("file:///nonexistent") } val errorMessage = intercept[Throwable] { - sqlContext.read.parquet("hdfs://nonexistent") + spark.read.parquet("hdfs://nonexistent") }.toString assert(errorMessage.contains("UnknownHostException")) } @@ -486,14 +487,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { dir => val m1 = intercept[SparkException] { - sqlContext.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) }.getCause.getMessage assert(m1.contains("Intentional exception for testing purposes")) } withTempPath { dir => val m2 = intercept[SparkException] { - val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1) + val df = spark.range(1).select('id as 'a, 'id as 'b).coalesce(1) df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) }.getCause.getMessage assert(m2.contains("Intentional exception for testing purposes")) @@ -512,11 +513,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { ParquetOutputFormat.ENABLE_DICTIONARY -> "true" ) - val hadoopConf = sqlContext.sessionState.newHadoopConfWithOptions(extraOptions) + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) withTempPath { dir => val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") + spark.range(1 << 16).selectExpr("(id % 4) AS i") .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head @@ -531,7 +532,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("null and non-null strings") { // Create a dataset where the first values are NULL and then some non-null values. The // number of non-nulls needs to be bigger than the ParquetReader batch size. - val data: Dataset[String] = sqlContext.range(200).map (i => + val data: Dataset[String] = spark.range(200).map (i => if (i < 150) null else "a" ) @@ -554,7 +555,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { checkAnswer( // Decimal column in this file is encoded using plain dictionary readResourceParquetFile("dec-in-i32.parquet"), - sqlContext.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec)) + spark.range(1 << 4).select('id % 10 cast DecimalType(5, 2) as 'i32_dec)) } } } @@ -565,7 +566,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { checkAnswer( // Decimal column in this file is encoded using plain dictionary readResourceParquetFile("dec-in-i64.parquet"), - sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) + spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) } } } @@ -576,7 +577,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { checkAnswer( // Decimal column in this file is encoded using plain dictionary readResourceParquetFile("dec-in-fixed-len.parquet"), - sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) + spark.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) } } } @@ -589,7 +590,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { var hash2: Int = 0 (false :: true :: Nil).foreach { v => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) { - val df = sqlContext.read.parquet(dir.getCanonicalPath) + val df = spark.read.parquet(dir.getCanonicalPath) val rows = df.queryExecution.toRdd.map(_.copy()).collect() val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow]) if (!v) { @@ -607,7 +608,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("VectorizedParquetRecordReader - direct path read") { val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString)) withTempPath { dir => - sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) + spark.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0); { val reader = new VectorizedParquetRecordReader http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 83b65fb..9dc5629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -81,7 +81,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS logParquetSchema(protobufStylePath) checkAnswer( - sqlContext.read.parquet(dir.getCanonicalPath), + spark.read.parquet(dir.getCanonicalPath), Seq( Row(Seq(0, 1)), Row(Seq(2, 3)))) http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index b4d35be..8707e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -400,7 +400,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha // Introduce _temporary dir to the base dir the robustness of the schema discovery process. new File(base.getCanonicalPath, "_temporary").mkdir() - sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t") + spark.read.parquet(base.getCanonicalPath).registerTempTable("t") withTempTable("t") { checkAnswer( @@ -484,7 +484,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t") + spark.read.parquet(base.getCanonicalPath).registerTempTable("t") withTempTable("t") { checkAnswer( @@ -532,7 +532,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - val parquetRelation = sqlContext.read.format("parquet").load(base.getCanonicalPath) + val parquetRelation = spark.read.format("parquet").load(base.getCanonicalPath) parquetRelation.registerTempTable("t") withTempTable("t") { @@ -572,7 +572,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - val parquetRelation = sqlContext.read.format("parquet").load(base.getCanonicalPath) + val parquetRelation = spark.read.format("parquet").load(base.getCanonicalPath) parquetRelation.registerTempTable("t") withTempTable("t") { @@ -604,7 +604,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), makePartitionDir(base, defaultPartitionName, "pi" -> 2)) - sqlContext + spark .read .option("mergeSchema", "true") .format("parquet") @@ -622,7 +622,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("SPARK-7749 Non-partitioned table should have empty partition spec") { withTempPath { dir => (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) - val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution + val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { case LogicalRelation(relation: HadoopFsRelation, _, _) => assert(relation.partitionSpec === PartitionSpec.emptySpec) @@ -636,7 +636,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha withTempPath { dir => val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s") df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath) - checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), df.collect()) + checkAnswer(spark.read.parquet(dir.getCanonicalPath), df.collect()) } } @@ -676,12 +676,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) - val df = sqlContext.createDataFrame(sparkContext.parallelize(row :: Nil), schema) + val df = spark.createDataFrame(sparkContext.parallelize(row :: Nil), schema) withTempPath { dir => df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) val fields = schema.map(f => Column(f.name).cast(f.dataType)) - checkAnswer(sqlContext.read.load(dir.toString).select(fields: _*), row) + checkAnswer(spark.read.load(dir.toString).select(fields: _*), row) } } @@ -697,7 +697,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store")) Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) - checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df) + checkAnswer(spark.read.format("parquet").load(dir.getCanonicalPath), df) } } @@ -714,7 +714,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS")) Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) - checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df) + checkAnswer(spark.read.format("parquet").load(tablePath.getCanonicalPath), df) } withTempPath { dir => @@ -731,7 +731,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS")) Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) - checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df) + checkAnswer(spark.read.format("parquet").load(tablePath.getCanonicalPath), df) } } @@ -746,7 +746,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha .save(tablePath.getCanonicalPath) val twoPartitionsDF = - sqlContext + spark .read .option("basePath", tablePath.getCanonicalPath) .parquet( @@ -756,7 +756,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha checkAnswer(twoPartitionsDF, df.filter("b != 3")) intercept[AssertionError] { - sqlContext + spark .read .parquet( s"${tablePath.getCanonicalPath}/b=1", @@ -829,7 +829,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS")) Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS")) Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS")) - checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df) + checkAnswer(spark.read.format("parquet").load(tablePath.getCanonicalPath), df) } } } @@ -884,9 +884,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha withTempPath { dir => withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") { val path = dir.getCanonicalPath - val df = sqlContext.range(5).select('id as 'a, 'id as 'b, 'id as 'c).coalesce(1) + val df = spark.range(5).select('id as 'a, 'id as 'b, 'id as 'c).coalesce(1) df.write.partitionBy("b", "c").parquet(path) - checkAnswer(sqlContext.read.parquet(path), df) + checkAnswer(spark.read.parquet(path), df) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5bf74b44/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index f1e9726..f9f9f80 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -46,24 +46,24 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("appending") { val data = (0 until 10).map(i => (i, i.toString)) - sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") + spark.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") // Query appends, don't test with both read modes. withParquetTable(data, "t", false) { sql("INSERT INTO TABLE t SELECT * FROM tmp") - checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) + checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.sessionState.catalog.dropTable( + spark.sessionState.catalog.dropTable( TableIdentifier("tmp"), ignoreIfNotExists = true) } test("overwriting") { val data = (0 until 10).map(i => (i, i.toString)) - sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") + spark.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") withParquetTable(data, "t") { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") - checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) + checkAnswer(spark.table("t"), data.map(Row.fromTuple)) } - sqlContext.sessionState.catalog.dropTable( + spark.sessionState.catalog.dropTable( TableIdentifier("tmp"), ignoreIfNotExists = true) } @@ -128,9 +128,9 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext val schema = StructType(List(StructField("d", DecimalType(18, 0), false), StructField("time", TimestampType, false)).toArray) withTempPath { file => - val df = sqlContext.createDataFrame(sparkContext.parallelize(data), schema) + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) df.write.parquet(file.getCanonicalPath) - val df2 = sqlContext.read.parquet(file.getCanonicalPath) + val df2 = spark.read.parquet(file.getCanonicalPath) checkAnswer(df2, df.collect().toSeq) } } @@ -139,12 +139,12 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) // delete summary files, so if we don't merge part-files, one column will not be included. Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) - assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) + assert(spark.read.parquet(basePath).columns.length === expectedColumnNumber) } } @@ -163,9 +163,9 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) - assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) + spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + assert(spark.read.parquet(basePath).columns.length === expectedColumnNumber) } } @@ -181,19 +181,19 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { withTempPath { dir => val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=a").toString) + spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + spark.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=a").toString) // Disables the global SQL option for schema merging withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") { assertResult(2) { // Disables schema merging via data source option - sqlContext.read.option("mergeSchema", "false").parquet(basePath).columns.length + spark.read.option("mergeSchema", "false").parquet(basePath).columns.length } assertResult(3) { // Enables schema merging via data source option - sqlContext.read.option("mergeSchema", "true").parquet(basePath).columns.length + spark.read.option("mergeSchema", "true").parquet(basePath).columns.length } } } @@ -204,10 +204,10 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext val basePath = dir.getCanonicalPath val schema = StructType(Array(StructField("name", DecimalType(10, 5), false))) val rowRDD = sparkContext.parallelize(Array(Row(Decimal("67123.45")))) - val df = sqlContext.createDataFrame(rowRDD, schema) + val df = spark.createDataFrame(rowRDD, schema) df.write.parquet(basePath) - val decimal = sqlContext.read.parquet(basePath).first().getDecimal(0) + val decimal = spark.read.parquet(basePath).first().getDecimal(0) assert(Decimal("67123.45") === Decimal(decimal)) } } @@ -227,7 +227,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { checkAnswer( - sqlContext.read.option("mergeSchema", "true").parquet(path), + spark.read.option("mergeSchema", "true").parquet(path), Seq( Row(Row(1, 1, null)), Row(Row(2, 2, null)), @@ -240,7 +240,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-10301 requested schema clipping - same schema") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + val df = spark.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) df.write.parquet(path) val userDefinedSchema = @@ -253,7 +253,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(0L, 1L))) } } @@ -261,12 +261,12 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-11997 parquet with null partition values") { withTempPath { dir => val path = dir.getCanonicalPath - sqlContext.range(1, 3) + spark.range(1, 3) .selectExpr("if(id % 2 = 0, null, id) AS n", "id") .write.partitionBy("n").parquet(path) checkAnswer( - sqlContext.read.parquet(path).filter("n is null"), + spark.read.parquet(path).filter("n is null"), Row(2, null)) } } @@ -275,7 +275,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets of fields") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + val df = spark.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) df.write.parquet(path) val userDefinedSchema = @@ -288,7 +288,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(null, null))) } } @@ -296,7 +296,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-10301 requested schema clipping - requested schema contains physical schema") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) + val df = spark.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 1) AS s").coalesce(1) df.write.parquet(path) val userDefinedSchema = @@ -311,13 +311,13 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(0L, 1L, null, null))) } withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 3) AS s").coalesce(1) + val df = spark.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 3) AS s").coalesce(1) df.write.parquet(path) val userDefinedSchema = @@ -332,7 +332,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(0L, null, null, 3L))) } } @@ -340,7 +340,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-10301 requested schema clipping - physical schema contains requested schema") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext + val df = spark .range(1) .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s") .coalesce(1) @@ -357,13 +357,13 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(0L, 1L))) } withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext + val df = spark .range(1) .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 3) AS s") .coalesce(1) @@ -380,7 +380,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(0L, 3L))) } } @@ -388,7 +388,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext test("SPARK-10301 requested schema clipping - schemas overlap but don't contain each other") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext + val df = spark .range(1) .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") .coalesce(1) @@ -406,7 +406,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(1L, 2L, null))) } } @@ -415,7 +415,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext + val df = spark .range(1) .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s") .coalesce(1) @@ -436,7 +436,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(Seq(Row(0, null))))) } } @@ -445,12 +445,12 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withTempPath { dir => val path = dir.getCanonicalPath - val df1 = sqlContext + val df1 = spark .range(1) .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") .coalesce(1) - val df2 = sqlContext + val df2 = spark .range(1, 2) .selectExpr("NAMED_STRUCT('c', id + 2, 'b', id + 1, 'd', id + 3) AS s") .coalesce(1) @@ -467,7 +467,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Seq( Row(Row(0, 1, null)), Row(Row(null, 2, 4)))) @@ -478,12 +478,12 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withTempPath { dir => val path = dir.getCanonicalPath - val df1 = sqlContext + val df1 = spark .range(1) .selectExpr("NAMED_STRUCT('a', id, 'c', id + 2) AS s") .coalesce(1) - val df2 = sqlContext + val df2 = spark .range(1, 2) .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s") .coalesce(1) @@ -492,7 +492,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext df2.write.mode(SaveMode.Append).parquet(path) checkAnswer( - sqlContext + spark .read .option("mergeSchema", "true") .parquet(path) @@ -507,7 +507,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext + val df = spark .range(1) .selectExpr( """NAMED_STRUCT( @@ -532,7 +532,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext nullable = true) checkAnswer( - sqlContext.read.schema(userDefinedSchema).parquet(path), + spark.read.schema(userDefinedSchema).parquet(path), Row(Row(NestedStruct(1, 2L, 3.5D)))) } } @@ -585,9 +585,9 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) + val df = spark.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) df.write.mode(SaveMode.Overwrite).parquet(path) - checkAnswer(sqlContext.read.parquet(path), df) + checkAnswer(spark.read.parquet(path), df) } } @@ -595,11 +595,11 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext withSQLConf("spark.sql.codegen.maxFields" -> "100") { withTempPath { dir => val path = dir.getCanonicalPath - val df = sqlContext.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*) + val df = spark.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*) df.write.mode(SaveMode.Overwrite).parquet(path) // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) - val df2 = sqlContext.read.parquet(path) + val df2 = spark.read.parquet(path) assert(df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isEmpty, "Should not return batch") checkAnswer(df2, df) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org