This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7d2fc05fd7 [HUDI-7781] Filter wrong partitions when using 
hoodie.datasource.write.partitions.to.delete (#11260)
c7d2fc05fd7 is described below

commit c7d2fc05fd7f285abd36c561217bf67de4e0479f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 22 17:51:34 2024 +0800

    [HUDI-7781] Filter wrong partitions when using 
hoodie.datasource.write.partitions.to.delete (#11260)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 10 ++++++---
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 24 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3c28b1a2e0a..87418764dea 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -573,10 +573,14 @@ class HoodieSparkSqlWriterInternal {
     //note:spark-sql may url-encode special characters (* -> %2A)
     var (wildcardPartitions, fullPartitions) = partitions.partition(partition 
=> partition.matches(".*(\\*|%2A).*"))
 
+    val allPartitions = FSUtils.getAllPartitionPaths(new 
HoodieSparkEngineContext(jsc): HoodieEngineContext,
+      HoodieMetadataConfig.newBuilder().fromProperties(cfg.getProps).build(), 
basePath)
+
+    if (fullPartitions.nonEmpty) {
+      fullPartitions = fullPartitions.filter(partition => 
allPartitions.contains(partition))
+    }
+
     if (wildcardPartitions.nonEmpty) {
-      //get list of all partitions
-      val allPartitions = FSUtils.getAllPartitionPaths(new 
HoodieSparkEngineContext(jsc): HoodieEngineContext,
-        
HoodieMetadataConfig.newBuilder().fromProperties(cfg.getProps).build(), 
basePath)
       //go through list of partitions with wildcards and add all partitions 
that match to val fullPartitions
       wildcardPartitions.foreach(partition => {
         //turn wildcard into regex
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 911351f4013..d8a6c9379a3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -18,8 +18,9 @@
 package org.apache.hudi
 
 import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, 
HoodieRecordPayload, HoodieTableType, WriteOperationType}
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, 
HoodieRecordPayload, HoodieReplaceCommitMetadata, HoodieTableType, 
WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.TimelineUtils
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
@@ -871,6 +872,27 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
     }).count())
   }
 
+  @Test
+  def testDeletePartitionsWithWrongPartition(): Unit = {
+    var (_, fooTableModifier) = deletePartitionSetup()
+    fooTableModifier = fooTableModifier
+      .updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), "2016/03/15" 
+ "," + "2025/03")
+      .updated(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.DELETE_PARTITION.name())
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
spark.emptyDataFrame)
+    val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
+    assertEquals(0, snapshotDF3.filter(entry => {
+      val partitionPath = entry.getString(3)
+      Seq("2015/03/16", "2015/03/17").count(p => partitionPath.equals(p)) != 1
+    }).count())
+
+    val activeTimeline = createMetaClient(spark, 
tempBasePath).getActiveTimeline
+    val metadata = 
TimelineUtils.getCommitMetadata(activeTimeline.lastInstant().get(), 
activeTimeline)
+      .asInstanceOf[HoodieReplaceCommitMetadata]
+    
assertTrue(metadata.getOperationType.equals(WriteOperationType.DELETE_PARTITION))
+    // "2025/03" should not be in partitionToReplaceFileIds
+    assertEquals(Collections.singleton("2016/03/15"), 
metadata.getPartitionToReplaceFileIds.keySet())
+  }
+
   /**
    * Test case for non partition table with metatable support.
    */

Reply via email to