Repository: spark Updated Branches: refs/heads/master deb411335 -> 578bfeeff
http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index b6be09e..a0075f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -688,11 +688,11 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") - intercept[Throwable](df2.saveAsParquetFile(filePath)) + intercept[Throwable](df2.write.parquet(filePath)) val df3 = df2.toDF("str", "max_int") - df3.saveAsParquetFile(filePath2) - val df4 = parquetFile(filePath2) + df3.write.parquet(filePath2) + val df4 = read.parquet(filePath2) checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) assert(df4.columns === Array("str", "max_int")) } @@ -731,14 +731,14 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll sparkContext.makeRDD(1 to 10) .map(i => ParquetData(i, s"part-$p")) .toDF() - .saveAsParquetFile(partDir.getCanonicalPath) + .write.parquet(partDir.getCanonicalPath) } sparkContext .makeRDD(1 to 10) .map(i => ParquetData(i, s"part-1")) .toDF() - .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath) + .write.parquet(new File(normalTableDir, "normal").getCanonicalPath) partitionedTableDirWithKey = Utils.createTempDir() @@ -747,7 +747,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll sparkContext.makeRDD(1 to 10) .map(i => ParquetDataWithKey(p, i, s"part-$p")) .toDF() - .saveAsParquetFile(partDir.getCanonicalPath) + .write.parquet(partDir.getCanonicalPath) } partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir() @@ -757,7 +757,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll sparkContext.makeRDD(1 to 10).map { i => ParquetDataWithKeyAndComplexTypes( p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) - }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + }.toDF().write.parquet(partDir.getCanonicalPath) } partitionedTableDirWithComplexTypes = Utils.createTempDir() @@ -766,7 +766,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") sparkContext.makeRDD(1 to 10).map { i => ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) - }.toDF().saveAsParquetFile(partDir.getCanonicalPath) + }.toDF().write.parquet(partDir.getCanonicalPath) } } http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index cf6afd2..f44b3c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -92,44 +92,27 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - non-partitioned table - Overwrite") { withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), + read.format(dataSourceName) + .option("path", file.getCanonicalPath) + .option("dataSchema", dataSchema.json) + .load(), testDF.collect()) } } test("save()/load() - non-partitioned table - Append") { withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Append) + testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) + testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath) checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)).orderBy("a"), + read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath).orderBy("a"), testDF.unionAll(testDF).orderBy("a").collect()) } } @@ -147,10 +130,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - non-partitioned table - Ignore") { withTempDir { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Ignore) + testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath) val path = new Path(file.getCanonicalPath) val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) @@ -160,89 +140,81 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - partitioned table - simple queries") { withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) checkQueries( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json))) + read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath)) } } test("save()/load() - partitioned table - Overwrite") { withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), + read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), partitionedTestDF.collect()) } } test("save()/load() - partitioned table - Append") { withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Append) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), + read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), partitionedTestDF.unionAll(partitionedTestDF).collect()) } } test("save()/load() - partitioned table - Append - new partition values") { withTempPath { file => - partitionedTestDF1.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.save( - source = dataSourceName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), + read.format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(file.getCanonicalPath), partitionedTestDF.collect()) } } @@ -250,11 +222,11 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - partitioned table - ErrorIfExists") { withTempDir { file => intercept[RuntimeException] { - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) } } } @@ -343,19 +315,19 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - partitioned table - Overwrite") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") withTable("t") { checkAnswer(table("t"), partitionedTestDF.collect()) @@ -363,19 +335,19 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - partitioned table - Append") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") withTable("t") { checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) @@ -383,19 +355,19 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - partitioned table - Append - new partition values") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") + + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") withTable("t") { checkAnswer(table("t"), partitionedTestDF.collect()) @@ -403,31 +375,31 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF1.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") // Using only a subset of all partition columns intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1")) + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p1") + .saveAsTable("t") } // Using different order of partition columns intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p2", "p1")) + partitionedTestDF2.write + .format(dataSourceName) + .mode(SaveMode.Append) + .option("dataSchema", dataSchema.json) + .partitionBy("p2", "p1") + .saveAsTable("t") } } @@ -436,12 +408,12 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { withTempTable("t") { intercept[AnalysisException] { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.ErrorIfExists) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") } } } @@ -450,12 +422,12 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { Seq.empty[(Int, String)].toDF().registerTempTable("t") withTempTable("t") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Ignore, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Ignore) + .option("dataSchema", dataSchema.json) + .partitionBy("p1", "p2") + .saveAsTable("t") assert(table("t").collect().isEmpty) } @@ -463,17 +435,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("Hadoop style globbing") { withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - val df = load( - source = dataSourceName, - options = Map( - "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", - "dataSchema" -> dataSchema.json)) + partitionedTestDF.write + .format(dataSourceName) + .mode(SaveMode.Overwrite) + .partitionBy("p1", "p2") + .save(file.getCanonicalPath) + + val df = read + .format(dataSourceName) + .option("dataSchema", dataSchema.json) + .load(s"${file.getCanonicalPath}/p1=*/p2=???") val expectedPaths = Set( s"${file.getCanonicalFile}/p1=1/p2=foo", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
