Repository: spark Updated Branches: refs/heads/branch-2.0 8d2fc010b -> ee6eea644
[SPARK-16034][SQL] Checks the partition columns when calling dataFrame.write.mode("append").saveAsTable ## What changes were proposed in this pull request? `DataFrameWriter` can be used to append data to existing data source tables. It becomes tricky when partition columns used in `DataFrameWriter.partitionBy(columns)` don't match the actual partition columns of the underlying table. This pull request enforces the check so that the partition columns of these two always match. ## How was this patch tested? Unit test. Author: Sean Zhong <seanzh...@databricks.com> Closes #13749 from clockfly/SPARK-16034. (cherry picked from commit ce3b98bae28af72299722f56e4e4ef831f471ec0) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6eea64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6eea64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6eea64 Branch: refs/heads/branch-2.0 Commit: ee6eea644fe0197a183385ef5879911ae8ab9ccb Parents: 8d2fc01 Author: Sean Zhong <seanzh...@databricks.com> Authored: Sat Jun 18 10:41:33 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Sat Jun 18 10:41:46 2016 -0700 ---------------------------------------------------------------------- .../command/createDataSourceTables.scala | 9 ++++- .../sql/execution/datasources/DataSource.scala | 39 ++++++++++---------- .../spark/sql/execution/command/DDLSuite.scala | 24 ++++++++++++ 3 files changed, 50 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 4918780..c38eca5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -242,8 +242,13 @@ case class CreateDataSourceTableAsSelectCommand( bucketSpec = bucketSpec, options = optionsWithPath) - val result = dataSource.write(mode, df) - + val result = try { + dataSource.write(mode, df) + } catch { + case ex: AnalysisException => + logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + throw ex + } if (createMetastoreTable) { // We will use the schema of resolved.relation as the schema of the table (instead of // the schema of df). It is important since the nullability may be changed by the relation http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 7f3683f..f274fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -435,26 +435,25 @@ case class DataSource( // If we are appending to a table that already exists, make sure the partitioning matches // up. If we fail to load the table for whatever reason, ignore the check. if (mode == SaveMode.Append) { - val existingPartitionColumnSet = try { - Some( - resolveRelation() - .asInstanceOf[HadoopFsRelation] - .location - .partitionSpec() - .partitionColumns - .fieldNames - .toSet) - } catch { - case e: Exception => - None - } - - existingPartitionColumnSet.foreach { ex => - if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { - throw new AnalysisException( - s"Requested partitioning does not equal existing partitioning: " + - s"$ex != ${partitionColumns.toSet}.") - } + val existingColumns = Try { + resolveRelation() + .asInstanceOf[HadoopFsRelation] + .location + .partitionSpec() + .partitionColumns + .fieldNames + .toSeq + }.getOrElse(Seq.empty[String]) + val sameColumns = + existingColumns.map(_.toLowerCase) == partitionColumns.map(_.toLowerCase) + if (existingColumns.size > 0 && !sameColumns) { + throw new AnalysisException( + s"""Requested partitioning does not match existing partitioning. + |Existing partitioning columns: + | ${existingColumns.mkString(", ")} + |Requested partitioning columns: + | ${partitionColumns.mkString(", ")} + |""".stripMargin) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ee6eea64/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7eb2fff..8827649 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1317,4 +1317,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)") } + test("SPARK-16034 Partition columns should match when appending to existing data source tables") { + import testImplicits._ + val df = Seq((1, 2, 3)).toDF("a", "b", "c") + withTable("partitionedTable") { + df.write.mode("overwrite").partitionBy("a", "b").saveAsTable("partitionedTable") + // Misses some partition columns + intercept[AnalysisException] { + df.write.mode("append").partitionBy("a").saveAsTable("partitionedTable") + } + // Wrong order + intercept[AnalysisException] { + df.write.mode("append").partitionBy("b", "a").saveAsTable("partitionedTable") + } + // Partition columns not specified + intercept[AnalysisException] { + df.write.mode("append").saveAsTable("partitionedTable") + } + assert(sql("select * from partitionedTable").collect().size == 1) + // Inserts new data successfully when partition columns are correctly specified in + // partitionBy(...). + df.write.mode("append").partitionBy("a", "b").saveAsTable("partitionedTable") + assert(sql("select * from partitionedTable").collect().size == 2) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org