Repository: spark
Updated Branches:
  refs/heads/master 0c83f718e -> 17f469bc8


[SPARK-24860][SQL] Support setting of partitionOverWriteMode in output options 
for writing DataFrame

## What changes were proposed in this pull request?

Besides spark setting spark.sql.sources.partitionOverwriteMode also allow 
setting partitionOverWriteMode per write

## How was this patch tested?

Added unit test in InsertSuite

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Koert Kuipers <ko...@tresata.com>

Closes #21818 from koertkuipers/feat-partition-overwrite-mode-per-write.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17f469bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17f469bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17f469bc

Branch: refs/heads/master
Commit: 17f469bc808e076b45fffcedb0147991fa4c41f3
Parents: 0c83f71
Author: Koert Kuipers <ko...@tresata.com>
Authored: Wed Jul 25 13:06:03 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed Jul 25 13:06:03 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 +++++-
 .../InsertIntoHadoopFsRelationCommand.scala     |  9 +++++++--
 .../apache/spark/sql/sources/InsertSuite.scala  | 20 ++++++++++++++++++++
 3 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d7c830d..53423e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1360,7 +1360,11 @@ object SQLConf {
         "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, 
and only overwrite " +
         "those partitions that have data written into it at runtime. By 
default we use static " +
         "mode to keep the same behavior of Spark prior to 2.3. Note that this 
config doesn't " +
-        "affect Hive serde tables, as they are always overwritten with dynamic 
mode.")
+        "affect Hive serde tables, as they are always overwritten with dynamic 
mode. This can " +
+        "also be set as an output option for a data source using key 
partitionOverwriteMode " +
+        "(which takes precendence over this setting), e.g. " +
+        "dataframe.write.option(\"partitionOverwriteMode\", 
\"dynamic\").save(path)."
+      )
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValues(PartitionOverwriteMode.values.map(_.toString))

http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index dd7ef0d..8a2e00d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogTable, CatalogT
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
@@ -91,8 +92,12 @@ case class InsertIntoHadoopFsRelationCommand(
 
     val pathExists = fs.exists(qualifiedOutputPath)
 
-    val enableDynamicOverwrite =
-      sparkSession.sessionState.conf.partitionOverwriteMode == 
PartitionOverwriteMode.DYNAMIC
+    val parameters = CaseInsensitiveMap(options)
+
+    val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
+      .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
+      .getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
+    val enableDynamicOverwrite = partitionOverwriteMode == 
PartitionOverwriteMode.DYNAMIC
     // This config only makes sense when we are overwriting a partitioned 
dataset with dynamic
     // partition columns.
     val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == 
SaveMode.Overwrite &&

http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 438d5d8..0b6d939 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -545,6 +545,26 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
     }
   }
 
+  test("SPARK-24860: dynamic partition overwrite specified per source without 
catalog table") {
+    withTempPath { path =>
+      Seq((1, 1), (2, 2)).toDF("i", "part")
+        .write.partitionBy("part")
+        .parquet(path.getAbsolutePath)
+      checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1) :: 
Row(2, 2) :: Nil)
+
+      Seq((1, 2), (1, 3)).toDF("i", "part")
+        .write.partitionBy("part").mode("overwrite")
+        .option("partitionOverwriteMode", 
"dynamic").parquet(path.getAbsolutePath)
+      checkAnswer(spark.read.parquet(path.getAbsolutePath),
+        Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil)
+
+      Seq((1, 2), (1, 3)).toDF("i", "part")
+        .write.partitionBy("part").mode("overwrite")
+        .option("partitionOverwriteMode", 
"static").parquet(path.getAbsolutePath)
+      checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 2) :: 
Row(1, 3) :: Nil)
+    }
+  }
+
   test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
     withTable("test_table") {
       val schema = new StructType()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to