Repository: spark Updated Branches: refs/heads/branch-2.1 1759cf69a -> 27a1a5c99
[SPARK-18544][SQL] Append with df.saveAsTable writes data to wrong location ## What changes were proposed in this pull request? We failed to properly propagate table metadata for existing tables for the saveAsTable command. This caused a downstream component to think the table was MANAGED, writing data to the wrong location. ## How was this patch tested? Unit test that fails before the patch. Author: Eric Liang <[email protected]> Closes #15983 from ericl/spark-18544. (cherry picked from commit e2318ede04fa7a756d1c8151775e1f2406a176ca) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27a1a5c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27a1a5c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27a1a5c9 Branch: refs/heads/branch-2.1 Commit: 27a1a5c99ff471ee15b56995d56cfd39b3ffe6e8 Parents: 1759cf6 Author: Eric Liang <[email protected]> Authored: Mon Nov 28 21:58:01 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Mon Nov 28 21:58:10 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 21 ++++++++++++-------- .../command/createDataSourceTables.scala | 3 ++- .../PartitionProviderCompatibilitySuite.scala | 19 ++++++++++++++++++ 3 files changed, 34 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 2d86342..8294e41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => - val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) - val tableType = if (storage.locationUri.isDefined) { + val existingTable = if (tableExists) { + Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent)) + } else { + None + } + val storage = if (tableExists) { + existingTable.get.storage + } else { + DataSource.buildStorageFormatFromOptions(extraOptions.toMap) + } + val tableType = if (tableExists) { + existingTable.get.tableType + } else if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED @@ -391,12 +402,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) df.sparkSession.sessionState.executePlan( CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd - if (tableDesc.partitionColumnNames.nonEmpty && - df.sparkSession.sqlContext.conf.manageFilesourcePartitions) { - // Need to recover partitions into the metastore so our saved data is visible. - df.sparkSession.sessionState.executePlan( - AlterTableRecoverPartitionsCommand(tableDesc.identifier)).toRdd - } } } http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index add732c..422700c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -212,7 +212,8 @@ case class CreateDataSourceTableAsSelectCommand( className = provider, partitionColumns = table.partitionColumnNames, bucketSpec = table.bucketSpec, - options = table.storage.properties ++ pathOption) + options = table.storage.properties ++ pathOption, + catalogTable = Some(table)) val result = try { dataSource.write(mode, df) http://git-wip-us.apache.org/repos/asf/spark/blob/27a1a5c9/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index a1aa074..cace5fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -188,6 +188,25 @@ class PartitionProviderCompatibilitySuite } } + for (enabled <- Seq(true, false)) { + test(s"SPARK-18544 append with saveAsTable - partition management $enabled") { + withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) { + withTable("test") { + withTempDir { dir => + setupPartitionedDatasourceTable("test", dir) + if (enabled) { + spark.sql("msck repair table test") + } + assert(spark.sql("select * from test").count() == 5) + spark.range(10).selectExpr("id as fieldOne", "id as partCol") + .write.partitionBy("partCol").mode("append").saveAsTable("test") + assert(spark.sql("select * from test").count() == 15) + } + } + } + } + } + /** * Runs a test against a multi-level partitioned table, then validates that the custom locations * were respected by the output writer. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
