Repository: spark Updated Branches: refs/heads/master 0c83f718e -> 17f469bc8
[SPARK-24860][SQL] Support setting of partitionOverWriteMode in output options for writing DataFrame ## What changes were proposed in this pull request? Besides spark setting spark.sql.sources.partitionOverwriteMode also allow setting partitionOverWriteMode per write ## How was this patch tested? Added unit test in InsertSuite Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Koert Kuipers <ko...@tresata.com> Closes #21818 from koertkuipers/feat-partition-overwrite-mode-per-write. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17f469bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17f469bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17f469bc Branch: refs/heads/master Commit: 17f469bc808e076b45fffcedb0147991fa4c41f3 Parents: 0c83f71 Author: Koert Kuipers <ko...@tresata.com> Authored: Wed Jul 25 13:06:03 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Wed Jul 25 13:06:03 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++++- .../InsertIntoHadoopFsRelationCommand.scala | 9 +++++++-- .../apache/spark/sql/sources/InsertSuite.scala | 20 ++++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d7c830d..53423e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1360,7 +1360,11 @@ object SQLConf { "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " + "those partitions that have data written into it at runtime. By default we use static " + "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " + - "affect Hive serde tables, as they are always overwritten with dynamic mode.") + "affect Hive serde tables, as they are always overwritten with dynamic mode. This can " + + "also be set as an output option for a data source using key partitionOverwriteMode " + + "(which takes precendence over this setting), e.g. " + + "dataframe.write.option(\"partitionOverwriteMode\", \"dynamic\").save(path)." + ) .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(PartitionOverwriteMode.values.map(_.toString)) http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index dd7ef0d..8a2e00d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -91,8 +92,12 @@ case class InsertIntoHadoopFsRelationCommand( val pathExists = fs.exists(qualifiedOutputPath) - val enableDynamicOverwrite = - sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + val parameters = CaseInsensitiveMap(options) + + val partitionOverwriteMode = parameters.get("partitionOverwriteMode") + .map(mode => PartitionOverwriteMode.withName(mode.toUpperCase)) + .getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode) + val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC // This config only makes sense when we are overwriting a partitioned dataset with dynamic // partition columns. val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite && http://git-wip-us.apache.org/repos/asf/spark/blob/17f469bc/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 438d5d8..0b6d939 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -545,6 +545,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { } } + test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") { + withTempPath { path => + Seq((1, 1), (2, 2)).toDF("i", "part") + .write.partitionBy("part") + .parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1) :: Row(2, 2) :: Nil) + + Seq((1, 2), (1, 3)).toDF("i", "part") + .write.partitionBy("part").mode("overwrite") + .option("partitionOverwriteMode", "dynamic").parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), + Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil) + + Seq((1, 2), (1, 3)).toDF("i", "part") + .write.partitionBy("part").mode("overwrite") + .option("partitionOverwriteMode", "static").parquet(path.getAbsolutePath) + checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 2) :: Row(1, 3) :: Nil) + } + } + test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") { withTable("test_table") { val schema = new StructType() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org