This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c7d2fc05fd7 [HUDI-7781] Filter wrong partitions when using
hoodie.datasource.write.partitions.to.delete (#11260)
c7d2fc05fd7 is described below
commit c7d2fc05fd7f285abd36c561217bf67de4e0479f
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 22 17:51:34 2024 +0800
[HUDI-7781] Filter wrong partitions when using
hoodie.datasource.write.partitions.to.delete (#11260)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 10 ++++++---
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 24 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3c28b1a2e0a..87418764dea 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -573,10 +573,14 @@ class HoodieSparkSqlWriterInternal {
//note:spark-sql may url-encode special characters (* -> %2A)
var (wildcardPartitions, fullPartitions) = partitions.partition(partition
=> partition.matches(".*(\\*|%2A).*"))
+ val allPartitions = FSUtils.getAllPartitionPaths(new
HoodieSparkEngineContext(jsc): HoodieEngineContext,
+ HoodieMetadataConfig.newBuilder().fromProperties(cfg.getProps).build(),
basePath)
+
+ if (fullPartitions.nonEmpty) {
+ fullPartitions = fullPartitions.filter(partition =>
allPartitions.contains(partition))
+ }
+
if (wildcardPartitions.nonEmpty) {
- //get list of all partitions
- val allPartitions = FSUtils.getAllPartitionPaths(new
HoodieSparkEngineContext(jsc): HoodieEngineContext,
-
HoodieMetadataConfig.newBuilder().fromProperties(cfg.getProps).build(),
basePath)
//go through list of partitions with wildcards and add all partitions
that match to val fullPartitions
wildcardPartitions.foreach(partition => {
//turn wildcard into regex
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 911351f4013..d8a6c9379a3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -18,8 +18,9 @@
package org.apache.hudi
import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord,
HoodieRecordPayload, HoodieTableType, WriteOperationType}
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord,
HoodieRecordPayload, HoodieReplaceCommitMetadata, HoodieTableType,
WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.TimelineUtils
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
@@ -871,6 +872,27 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
}).count())
}
+ @Test
+ def testDeletePartitionsWithWrongPartition(): Unit = {
+ var (_, fooTableModifier) = deletePartitionSetup()
+ fooTableModifier = fooTableModifier
+ .updated(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key(), "2016/03/15"
+ "," + "2025/03")
+ .updated(DataSourceWriteOptions.OPERATION.key(),
WriteOperationType.DELETE_PARTITION.name())
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier,
spark.emptyDataFrame)
+ val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
+ assertEquals(0, snapshotDF3.filter(entry => {
+ val partitionPath = entry.getString(3)
+ Seq("2015/03/16", "2015/03/17").count(p => partitionPath.equals(p)) != 1
+ }).count())
+
+ val activeTimeline = createMetaClient(spark,
tempBasePath).getActiveTimeline
+ val metadata =
TimelineUtils.getCommitMetadata(activeTimeline.lastInstant().get(),
activeTimeline)
+ .asInstanceOf[HoodieReplaceCommitMetadata]
+
assertTrue(metadata.getOperationType.equals(WriteOperationType.DELETE_PARTITION))
+ // "2025/03" should not be in partitionToReplaceFileIds
+ assertEquals(Collections.singleton("2016/03/15"),
metadata.getPartitionToReplaceFileIds.keySet())
+ }
+
/**
* Test case for non partition table with metatable support.
*/