This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new c017422  [SPARK-29295][SQL][2.4] Insert overwrite to Hive external 
table partition should delete old data
c017422 is described below

commit c017422c6582121075738746cf9c7ae2257c658d
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Thu Mar 12 03:00:35 2020 -0700

    [SPARK-29295][SQL][2.4] Insert overwrite to Hive external table partition 
should delete old data
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to delete old Hive external partition directory even 
the partition does not exist in Hive, when insert overwrite Hive external table 
partition.
    
    This is backport of #25979 to branch-2.4.
    
    ### Why are the changes needed?
    
    When insert overwrite to a Hive external table partition, if the partition 
does not exist, Hive will not check if the external partition directory exists 
or not before copying files. So if users drop the partition, and then do insert 
overwrite to the same partition, the partition will have both old and new data.
    
    For example:
    ```scala
    withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
      // test is an external Hive table.
      sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1")
      sql("ALTER TABLE test DROP PARTITION(name='n1')")
      sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2")
      sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id") // Got both 1 
and 2.
    }
    ```
    
    ### Does this PR introduce any user-facing change?
    
    Yes. This fix a correctness issue when users drop partition on a Hive 
external table partition and then insert overwrite it.
    
    ### How was this patch tested?
    
    Added test.
    
    Closes #27887 from viirya/SPARK-29295-2.4.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/hive/execution/InsertIntoHiveTable.scala   | 68 +++++++++++++++---
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 80 ++++++++++++++++++++++
 2 files changed, 139 insertions(+), 9 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 0ed464d..1365737 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
ExternalCatalog, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
@@ -192,7 +192,7 @@ case class InsertIntoHiveTable(
       }.asInstanceOf[Attribute]
     }
 
-    saveAsHiveFile(
+    val writtenParts = saveAsHiveFile(
       sparkSession = sparkSession,
       plan = child,
       hadoopConf = hadoopConf,
@@ -202,6 +202,42 @@ case class InsertIntoHiveTable(
 
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
+        if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+          // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
+          // partition does not exist, Hive will not check if the external 
partition directory
+          // exists or not before copying files. So if users drop the 
partition, and then do
+          // insert overwrite to the same partition, the partition will have 
both old and new
+          // data. We construct partition path. If the path exists, we delete 
it manually.
+          writtenParts.foreach { partPath =>
+            val dpMap = partPath.split("/").map { part =>
+              val splitPart = part.split("=")
+              assert(splitPart.size == 2, s"Invalid written partition path: 
$part")
+              ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
+                ExternalCatalogUtils.unescapePathName(splitPart(1))
+            }.toMap
+
+            val updatedPartitionSpec = partition.map {
+              case (key, Some(value)) => key -> value
+              case (key, None) if dpMap.contains(key) => key -> dpMap(key)
+              case (key, _) =>
+                throw new SparkException(s"Dynamic partition key $key is not 
among " +
+                  "written partition paths.")
+            }
+            val partitionColumnNames = table.partitionColumnNames
+            val tablePath = new Path(table.location)
+            val partitionPath = 
ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec,
+              partitionColumnNames, tablePath)
+
+            val fs = partitionPath.getFileSystem(hadoopConf)
+            if (fs.exists(partitionPath)) {
+              if (!fs.delete(partitionPath, true)) {
+                throw new RuntimeException(
+                  "Cannot remove partition directory '" + 
partitionPath.toString)
+              }
+            }
+          }
+        }
+
         externalCatalog.loadDynamicPartitions(
           db = table.database,
           table = table.identifier.table,
@@ -223,18 +259,32 @@ case class InsertIntoHiveTable(
         var doHiveOverwrite = overwrite
 
         if (oldPart.isEmpty || !ifPartitionNotExists) {
+          // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
+          // partition does not exist, Hive will not check if the external 
partition directory
+          // exists or not before copying files. So if users drop the 
partition, and then do
+          // insert overwrite to the same partition, the partition will have 
both old and new
+          // data. We construct partition path. If the path exists, we delete 
it manually.
+          val partitionPath = if (oldPart.isEmpty && overwrite
+            && table.tableType == CatalogTableType.EXTERNAL) {
+            val partitionColumnNames = table.partitionColumnNames
+            val tablePath = new Path(table.location)
+            Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec,
+              partitionColumnNames, tablePath))
+          } else {
+            oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri)))
+          }
+
           // SPARK-18107: Insert overwrite runs much slower than hive-client.
           // Newer Hive largely improves insert overwrite performance. As 
Spark uses older Hive
           // version and we may not want to catch up new Hive version every 
time. We delete the
           // Hive partition first and then load data file into the Hive 
partition.
-          if (oldPart.nonEmpty && overwrite) {
-            oldPart.get.storage.locationUri.foreach { uri =>
-              val partitionPath = new Path(uri)
-              val fs = partitionPath.getFileSystem(hadoopConf)
-              if (fs.exists(partitionPath)) {
-                if (!fs.delete(partitionPath, true)) {
+          if (partitionPath.nonEmpty && overwrite) {
+            partitionPath.foreach { path =>
+              val fs = path.getFileSystem(hadoopConf)
+              if (fs.exists(path)) {
+                if (!fs.delete(path, true)) {
                   throw new RuntimeException(
-                    "Cannot remove partition directory '" + 
partitionPath.toString)
+                    "Cannot remove partition directory '" + path.toString)
                 }
                 // Don't let Hive do overwrite operation since it is slower.
                 doHiveOverwrite = false
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 9f57776..aac1ae8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2375,4 +2375,84 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       }
     }
   }
+
+  test("SPARK-29295: insert overwrite external partition should not have old 
data") {
+    Seq("true", "false").foreach { convertParquet =>
+      withTable("spark29295") {
+        withTempDir { f =>
+          sql("CREATE EXTERNAL TABLE spark29295(id int) PARTITIONED BY (name 
string) STORED AS " +
+            s"PARQUET LOCATION '${f.getAbsolutePath}'")
+
+          withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> 
convertParquet) {
+            sql("INSERT OVERWRITE TABLE spark29295 PARTITION(name='n1') SELECT 
1")
+            sql("ALTER TABLE spark29295 DROP PARTITION(name='n1')")
+            sql("INSERT OVERWRITE TABLE spark29295 PARTITION(name='n1') SELECT 
2")
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE name = 'n1' ORDER 
BY id"),
+              Array(Row(2)))
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-29295: dynamic insert overwrite external partition should not 
have old data") {
+    Seq("true", "false").foreach { convertParquet =>
+      withTable("spark29295") {
+        withTempDir { f =>
+          sql("CREATE EXTERNAL TABLE spark29295(id int) PARTITIONED BY (p1 
string, p2 string) " +
+            s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'")
+
+          withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> 
convertParquet,
+            "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+            sql(
+              """
+                |INSERT OVERWRITE TABLE spark29295 PARTITION(p1='n1', p2)
+                |SELECT * FROM VALUES (1, 'n2'), (2, 'n3') AS t(id, p2)
+              """.stripMargin)
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n2' ORDER BY id"),
+              Array(Row(1)))
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n3' ORDER BY id"),
+              Array(Row(2)))
+
+            sql("INSERT OVERWRITE TABLE spark29295 PARTITION(p1='n1', p2) 
SELECT 4, 'n4'")
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n4' ORDER BY id"),
+              Array(Row(4)))
+
+            sql("ALTER TABLE spark29295 DROP PARTITION(p1='n1',p2='n2')")
+            sql("ALTER TABLE spark29295 DROP PARTITION(p1='n1',p2='n3')")
+
+            sql(
+              """
+                |INSERT OVERWRITE TABLE spark29295 PARTITION(p1='n1', p2)
+                |SELECT * FROM VALUES (5, 'n2'), (6, 'n3') AS t(id, p2)
+              """.stripMargin)
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n2' ORDER BY id"),
+              Array(Row(5)))
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n3' ORDER BY id"),
+              Array(Row(6)))
+            // Partition not overwritten should not be deleted.
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= 'n4' ORDER BY id"),
+              Array(Row(4)))
+          }
+        }
+      }
+
+      withTable("spark29295") {
+        withTempDir { f =>
+          sql("CREATE EXTERNAL TABLE spark29295(id int) PARTITIONED BY (p1 
string, p2 string) " +
+            s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'")
+
+          withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> 
convertParquet,
+            "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+            // We should unescape partition value.
+            sql("INSERT OVERWRITE TABLE spark29295 PARTITION(p1='n1', p2) 
SELECT 1, '/'")
+            sql("ALTER TABLE spark29295 DROP PARTITION(p1='n1',p2='/')")
+            sql("INSERT OVERWRITE TABLE spark29295 PARTITION(p1='n1', p2) 
SELECT 2, '/'")
+            checkAnswer(sql("SELECT id FROM spark29295 WHERE p1 = 'n1' and p2 
= '/' ORDER BY id"),
+              Array(Row(2)))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to