Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f41be8fb3 -> 9da55b570


http://git-wip-us.apache.org/repos/asf/spark/blob/9da55b57/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index b6be09e..a0075f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -688,11 +688,11 @@ class ParquetDataSourceOnSourceSuite extends 
ParquetSourceSuiteBase {
 
     val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
     val df2 = df.as('x).join(df.as('y), $"x.str" === 
$"y.str").groupBy("y.str").max("y.int")
-    intercept[Throwable](df2.saveAsParquetFile(filePath))
+    intercept[Throwable](df2.write.parquet(filePath))
 
     val df3 = df2.toDF("str", "max_int")
-    df3.saveAsParquetFile(filePath2)
-    val df4 = parquetFile(filePath2)
+    df3.write.parquet(filePath2)
+    val df4 = read.parquet(filePath2)
     checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
     assert(df4.columns === Array("str", "max_int"))
   }
@@ -731,14 +731,14 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
       sparkContext.makeRDD(1 to 10)
         .map(i => ParquetData(i, s"part-$p"))
         .toDF()
-        .saveAsParquetFile(partDir.getCanonicalPath)
+        .write.parquet(partDir.getCanonicalPath)
     }
 
     sparkContext
       .makeRDD(1 to 10)
       .map(i => ParquetData(i, s"part-1"))
       .toDF()
-      .saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
+      .write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
 
     partitionedTableDirWithKey = Utils.createTempDir()
 
@@ -747,7 +747,7 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
       sparkContext.makeRDD(1 to 10)
         .map(i => ParquetDataWithKey(p, i, s"part-$p"))
         .toDF()
-        .saveAsParquetFile(partDir.getCanonicalPath)
+        .write.parquet(partDir.getCanonicalPath)
     }
 
     partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
@@ -757,7 +757,7 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
       sparkContext.makeRDD(1 to 10).map { i =>
         ParquetDataWithKeyAndComplexTypes(
           p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
-      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+      }.toDF().write.parquet(partDir.getCanonicalPath)
     }
 
     partitionedTableDirWithComplexTypes = Utils.createTempDir()
@@ -766,7 +766,7 @@ abstract class ParquetPartitioningTest extends QueryTest 
with BeforeAndAfterAll
       val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
       sparkContext.makeRDD(1 to 10).map { i =>
         ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, 
f"${i}_string"), 1 to i)
-      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+      }.toDF().write.parquet(partDir.getCanonicalPath)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9da55b57/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index cf6afd2..f44b3c5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -92,44 +92,27 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
 
   test("save()/load() - non-partitioned table - Overwrite") {
     withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
+      
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
+      
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
 
       checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
+        read.format(dataSourceName)
+          .option("path", file.getCanonicalPath)
+          .option("dataSchema", dataSchema.json)
+          .load(),
         testDF.collect())
     }
   }
 
   test("save()/load() - non-partitioned table - Append") {
     withTempPath { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Overwrite)
-
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Append)
+      
testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath)
+      
testDF.write.mode(SaveMode.Append).format(dataSourceName).save(file.getCanonicalPath)
 
       checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)).orderBy("a"),
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchema.json)
+          .load(file.getCanonicalPath).orderBy("a"),
         testDF.unionAll(testDF).orderBy("a").collect())
     }
   }
@@ -147,10 +130,7 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
 
   test("save()/load() - non-partitioned table - Ignore") {
     withTempDir { file =>
-      testDF.save(
-        path = file.getCanonicalPath,
-        source = dataSourceName,
-        mode = SaveMode.Ignore)
+      
testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath)
 
       val path = new Path(file.getCanonicalPath)
       val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -160,89 +140,81 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
 
   test("save()/load() - partitioned table - simple queries") {
     withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.ErrorIfExists,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.ErrorIfExists)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
 
       checkQueries(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)))
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchema.json)
+          .load(file.getCanonicalPath))
     }
   }
 
   test("save()/load() - partitioned table - Overwrite") {
     withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
+
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
 
       checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchema.json)
+          .load(file.getCanonicalPath),
         partitionedTestDF.collect())
     }
   }
 
   test("save()/load() - partitioned table - Append") {
     withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
+
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
 
       checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchema.json)
+          .load(file.getCanonicalPath),
         partitionedTestDF.unionAll(partitionedTestDF).collect())
     }
   }
 
   test("save()/load() - partitioned table - Append - new partition values") {
     withTempPath { file =>
-      partitionedTestDF1.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      partitionedTestDF2.save(
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
+      partitionedTestDF1.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
+
+      partitionedTestDF2.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
 
       checkAnswer(
-        load(
-          source = dataSourceName,
-          options = Map(
-            "path" -> file.getCanonicalPath,
-            "dataSchema" -> dataSchema.json)),
+        read.format(dataSourceName)
+          .option("dataSchema", dataSchema.json)
+          .load(file.getCanonicalPath),
         partitionedTestDF.collect())
     }
   }
@@ -250,11 +222,11 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
   test("save()/load() - partitioned table - ErrorIfExists") {
     withTempDir { file =>
       intercept[RuntimeException] {
-        partitionedTestDF.save(
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("path" -> file.getCanonicalPath),
-          partitionColumns = Seq("p1", "p2"))
+        partitionedTestDF.write
+          .format(dataSourceName)
+          .mode(SaveMode.ErrorIfExists)
+          .partitionBy("p1", "p2")
+          .save(file.getCanonicalPath)
       }
     }
   }
@@ -343,19 +315,19 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
   }
 
   test("saveAsTable()/load() - partitioned table - Overwrite") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
+    partitionedTestDF.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
+
+    partitionedTestDF.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
 
     withTable("t") {
       checkAnswer(table("t"), partitionedTestDF.collect())
@@ -363,19 +335,19 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
   }
 
   test("saveAsTable()/load() - partitioned table - Append") {
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
+    partitionedTestDF.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
+
+    partitionedTestDF.write
+      .format(dataSourceName)
+      .mode(SaveMode.Append)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
 
     withTable("t") {
       checkAnswer(table("t"), 
partitionedTestDF.unionAll(partitionedTestDF).collect())
@@ -383,19 +355,19 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
   }
 
   test("saveAsTable()/load() - partitioned table - Append - new partition 
values") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
-
-    partitionedTestDF2.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Append,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
+    partitionedTestDF1.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
+
+    partitionedTestDF2.write
+      .format(dataSourceName)
+      .mode(SaveMode.Append)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
 
     withTable("t") {
       checkAnswer(table("t"), partitionedTestDF.collect())
@@ -403,31 +375,31 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
   }
 
   test("saveAsTable()/load() - partitioned table - Append - mismatched 
partition columns") {
-    partitionedTestDF1.saveAsTable(
-      tableName = "t",
-      source = dataSourceName,
-      mode = SaveMode.Overwrite,
-      options = Map("dataSchema" -> dataSchema.json),
-      partitionColumns = Seq("p1", "p2"))
+    partitionedTestDF1.write
+      .format(dataSourceName)
+      .mode(SaveMode.Overwrite)
+      .option("dataSchema", dataSchema.json)
+      .partitionBy("p1", "p2")
+      .saveAsTable("t")
 
     // Using only a subset of all partition columns
     intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1"))
+      partitionedTestDF2.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)
+        .option("dataSchema", dataSchema.json)
+        .partitionBy("p1")
+        .saveAsTable("t")
     }
 
     // Using different order of partition columns
     intercept[Throwable] {
-      partitionedTestDF2.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Append,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p2", "p1"))
+      partitionedTestDF2.write
+        .format(dataSourceName)
+        .mode(SaveMode.Append)
+        .option("dataSchema", dataSchema.json)
+        .partitionBy("p2", "p1")
+        .saveAsTable("t")
     }
   }
 
@@ -436,12 +408,12 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
 
     withTempTable("t") {
       intercept[AnalysisException] {
-        partitionedTestDF.saveAsTable(
-          tableName = "t",
-          source = dataSourceName,
-          mode = SaveMode.ErrorIfExists,
-          options = Map("dataSchema" -> dataSchema.json),
-          partitionColumns = Seq("p1", "p2"))
+        partitionedTestDF.write
+          .format(dataSourceName)
+          .mode(SaveMode.ErrorIfExists)
+          .option("dataSchema", dataSchema.json)
+          .partitionBy("p1", "p2")
+          .saveAsTable("t")
       }
     }
   }
@@ -450,12 +422,12 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
     Seq.empty[(Int, String)].toDF().registerTempTable("t")
 
     withTempTable("t") {
-      partitionedTestDF.saveAsTable(
-        tableName = "t",
-        source = dataSourceName,
-        mode = SaveMode.Ignore,
-        options = Map("dataSchema" -> dataSchema.json),
-        partitionColumns = Seq("p1", "p2"))
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Ignore)
+        .option("dataSchema", dataSchema.json)
+        .partitionBy("p1", "p2")
+        .saveAsTable("t")
 
       assert(table("t").collect().isEmpty)
     }
@@ -463,17 +435,16 @@ class HadoopFsRelationTest extends QueryTest with 
ParquetTest {
 
   test("Hadoop style globbing") {
     withTempPath { file =>
-      partitionedTestDF.save(
-        source = dataSourceName,
-        mode = SaveMode.Overwrite,
-        options = Map("path" -> file.getCanonicalPath),
-        partitionColumns = Seq("p1", "p2"))
-
-      val df = load(
-        source = dataSourceName,
-        options = Map(
-          "path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
-          "dataSchema" -> dataSchema.json))
+      partitionedTestDF.write
+        .format(dataSourceName)
+        .mode(SaveMode.Overwrite)
+        .partitionBy("p1", "p2")
+        .save(file.getCanonicalPath)
+
+      val df = read
+        .format(dataSourceName)
+        .option("dataSchema", dataSchema.json)
+        .load(s"${file.getCanonicalPath}/p1=*/p2=???")
 
       val expectedPaths = Set(
         s"${file.getCanonicalFile}/p1=1/p2=foo",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to