This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9cd64f8  [SPARK-37217][SQL][3.2] The number of dynamic partitions 
should early check when writing to external tables
9cd64f8 is described below

commit 9cd64f8fed396d38a4d2f64559ad00c261dad47e
Author: sychen <[email protected]>
AuthorDate: Tue Dec 14 10:18:53 2021 -0800

    [SPARK-37217][SQL][3.2] The number of dynamic partitions should early check 
when writing to external tables
    
    ### What changes were proposed in this pull request?
    SPARK-29295 introduces a mechanism that writes to external tables is a 
dynamic partition method, and the data in the target partition will be deleted 
first.
    
    Assuming that 1001 partitions are written, the data of 10001 partitions 
will be deleted first, but because `hive.exec.max.dynamic.partitions` is 1000 
by default, loadDynamicPartitions will fail at this time, but the data of 1001 
partitions has been deleted.
    
    So we can check whether the number of dynamic partitions is greater than 
`hive.exec.max.dynamic.partitions` before deleting, it should fail quickly at 
this time.
    
    ### Why are the changes needed?
    Avoid data that cannot be recovered when the job fails.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    add UT
    
    Closes #34889 from cxzl25/SPARK-37217-3.2.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Chao Sun <[email protected]>
---
 .../spark/sql/errors/QueryExecutionErrors.scala    | 11 +++++++++
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  9 +++++++
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 28 ++++++++++++++++++++++
 3 files changed, 48 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 7f77243..d4fbd38 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1804,4 +1804,15 @@ object QueryExecutionErrors {
   def pivotNotAfterGroupByUnsupportedError(): Throwable = {
     new UnsupportedOperationException("pivot is only supported after a 
groupBy")
   }
+
+  def writePartitionExceedConfigSizeWhenDynamicPartitionError(
+      numWrittenParts: Int,
+      maxDynamicPartitions: Int,
+      maxDynamicPartitionsKey: String): Throwable = {
+    new SparkException(
+      s"Number of dynamic partitions created is $numWrittenParts" +
+        s", which is more than $maxDynamicPartitions" +
+        s". To solve this try to set $maxDynamicPartitionsKey" +
+        s" to at least $numWrittenParts.")
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 108401c..4a678f7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.ErrorMsg
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
@@ -212,6 +213,14 @@ case class InsertIntoHiveTable(
     if (partition.nonEmpty) {
       if (numDynamicPartitions > 0) {
         if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
+          val numWrittenParts = writtenParts.size
+          val maxDynamicPartitionsKey = 
HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+          val maxDynamicPartitions = hadoopConf.getInt(maxDynamicPartitionsKey,
+            HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.defaultIntVal)
+          if (numWrittenParts > maxDynamicPartitions) {
+            throw 
QueryExecutionErrors.writePartitionExceedConfigSizeWhenDynamicPartitionError(
+              numWrittenParts, maxDynamicPartitions, maxDynamicPartitionsKey)
+          }
           // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
           // partition does not exist, Hive will not check if the external 
partition directory
           // exists or not before copying files. So if users drop the 
partition, and then do
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8d248bb..ba362d9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2659,6 +2659,34 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
       }
     }
   }
+
+  test("SPARK-37217: Dynamic partitions should fail quickly " +
+    "when writing to external tables to prevent data deletion") {
+    withTable("test") {
+      withTempDir { f =>
+        sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 
string) " +
+          s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'")
+
+        withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+          "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          val insertSQL = """
+              |INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2)
+              |SELECT * FROM VALUES (1, 'n2'), (2, 'n3'), (3, 'n4') AS t(id, 
p2)
+              """.stripMargin
+          withSQLConf("hive.exec.max.dynamic.partitions" -> "2") {
+            val e = intercept[SparkException] {
+              sql(insertSQL)
+            }
+            assert(e.getMessage.contains("Number of dynamic partitions 
created"))
+          }
+
+          withSQLConf("hive.exec.max.dynamic.partitions" -> "3") {
+            sql(insertSQL)
+          }
+        }
+      }
+    }
+  }
 }
 
 @SlowHiveTest

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to