This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c20c01978b4 [HUDI-7685] Fix delete partition instant commit in 
partition TTL (#11117)
c20c01978b4 is described below

commit c20c01978b4c1e7daf1e89de535dce85bebc0d93
Author: Manu <[email protected]>
AuthorDate: Wed May 1 16:45:27 2024 +0800

    [HUDI-7685] Fix delete partition instant commit in partition TTL (#11117)
---
 .../hudi/client/utils/DeletePartitionUtils.java    |  4 +-
 ...eletePartitionPendingTableServiceException.java | 28 +++++++
 .../ttl/strategy/KeepByCreationTimeStrategy.java   |  2 +-
 .../action/ttl/strategy/KeepByTimeStrategy.java    |  2 +-
 .../commit/SparkPartitionTTLActionExecutor.java    | 15 +++-
 .../apache/hudi/testutils/DataSourceTestUtils.java | 12 +++
 .../TestHoodieSparkSqlWriterPartitionTTL.scala     | 94 ++++++++++++++++++++++
 7 files changed, 152 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
index 92c2065457e..73576efda26 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.client.utils;
 
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
-import org.apache.hudi.exception.HoodieDeletePartitionException;
+import 
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.ArrayList;
@@ -67,7 +67,7 @@ public class DeletePartitionUtils {
         .forEach(x -> 
instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
 
     if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
-      throw new HoodieDeletePartitionException("Failed to drop partitions. "
+      throw new HoodieDeletePartitionPendingTableServiceException("Failed to 
drop partitions. "
           + "Please ensure that there are no pending table service actions 
(clustering/compaction) for the partitions to be deleted: " + partitionsToDrop 
+ ". "
           + "Instant(s) of offending pending table service action: "
           + 
instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
new file mode 100644
index 00000000000..3e4e2574965
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieDeletePartitionPendingTableServiceException extends 
HoodieDeletePartitionException {
+
+  public HoodieDeletePartitionPendingTableServiceException(String msg) {
+    super(msg);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
index 038b82f6dc5..96b28b25c5c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ * KeepByTimeStrategy will return expired partitions by their create time.
  */
 public class KeepByCreationTimeStrategy extends KeepByTimeStrategy {
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
index b6d67bb9e8a..7051a9b59fb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
@@ -103,7 +103,7 @@ public class KeepByTimeStrategy extends 
PartitionTTLStrategy {
    * @param referenceTime last commit time or creation time for partition
    */
   protected boolean isPartitionExpired(String referenceTime) {
-    String expiredTime = 
instantTimePlusMillis(fixInstantTimeCompatibility(referenceTime), ttlInMilis);
+    String expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis);
     return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
index 166fc3672fa..b010325a8d4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
+import 
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 public class SparkPartitionTTLActionExecutor<T>
@@ -47,11 +49,22 @@ public class SparkPartitionTTLActionExecutor<T>
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    HoodieWriteMetadata<HoodieData<WriteStatus>> emptyResult = new 
HoodieWriteMetadata<>();
+    emptyResult.setPartitionToReplaceFileIds(Collections.emptyMap());
+    emptyResult.setWriteStatuses(context.emptyHoodieData());
     try {
       PartitionTTLStrategy strategy = 
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(), 
instantTime);
       List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+      if (expiredPartitions.isEmpty()) {
+        return emptyResult;
+      }
       LOG.info("Partition ttl find the following expired partitions to delete: 
 " + String.join(",", expiredPartitions));
-      return new SparkDeletePartitionCommitActionExecutor<>(context, config, 
table, instantTime, expiredPartitions).execute();
+      // Auto commit is disabled in config, copy config and enable auto commit 
for SparkDeletePartitionCommitActionExecutor.
+      HoodieWriteConfig autoCommitConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps()).withAutoCommit(true).build();
+      return new SparkDeletePartitionCommitActionExecutor<>(context, 
autoCommitConfig, table, instantTime, expiredPartitions).execute();
+    } catch (HoodieDeletePartitionPendingTableServiceException 
deletePartitionPendingTableServiceException) {
+      LOG.info("Partition is under table service, do nothing, call delete 
partition next time.");
+      return emptyResult;
     } catch (IOException e) {
       throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 10642318325..f00878069ee 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -74,6 +74,18 @@ public class DataSourceTestUtils {
     return toReturn;
   }
 
+  public static List<Row> generateRandomRowsByPartition(int count, String 
partition) {
+    List<Row> toReturn = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      Object[] values = new Object[3];
+      values[0] = 
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
+      values[1] = partition;
+      values[2] = new Date().getTime();
+      toReturn.add(RowFactory.create(values));
+    }
+    return toReturn;
+  }
+
   public static List<Row> generateUpdates(List<Row> records, int count) {
     List<Row> toReturn = new ArrayList<>();
     for (int i = 0; i < count; i++) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
new file mode 100644
index 00000000000..00614339362
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
@@ -0,0 +1,94 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
+import org.apache.hudi.common.model.HoodieFileFormat
+import 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{fixInstantTimeCompatibility,
 instantTimePlusMillis}
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.config.{HoodieTTLConfig, HoodieWriteConfig}
+import org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Test
+import org.apache.hudi.table.HoodieTable
+
+
+/**
+ * The origin PartitionTTLStrategy calculate the expire time by DAYs, it's too 
long for test.
+ * Override the method isPartitionExpired to calculate expire time by minutes.
+ * @param hoodieTable
+ * @param instantTime
+ */
+class HoodieSparkSqlWriterTestStrategy(hoodieTable: HoodieTable[_, _, _, _], 
instantTime: String)
+  extends KeepByTimeStrategy(hoodieTable, instantTime) {
+  override def isPartitionExpired(referenceTime: String): Boolean = {
+    val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 / 
60)
+    fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0
+  }
+}
+
+class TestHoodieSparkSqlWriterPartitionTTL extends HoodieSparkWriterTestBase {
+
+  /**
+   *  Test partition ttl with HoodieSparkSqlWriter.
+   */
+  @Test
+  def testSparkSqlWriterWithPartitionTTL(): Unit = {
+    val hoodieFooTableName = "hoodie_foo_tbl"
+    val fooTableModifier = Map("path" -> tempBasePath,
+      HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      HoodieWriteConfig.BASE_FILE_FORMAT.key -> HoodieFileFormat.PARQUET.name,
+      DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+      HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "4",
+      DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+      HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
+      HoodieTTLConfig.INLINE_PARTITION_TTL.key() -> "true",
+      HoodieTTLConfig.DAYS_RETAIN.key() -> "1",
+      HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME.key() -> 
"org.apache.hudi.HoodieSparkSqlWriterTestStrategy"
+    )
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val recordsForPart1 = 
DataSourceTestUtils.generateRandomRowsByPartition(100, "part1")
+    val recordsSeqForPart1 = convertRowListToSeq(recordsForPart1)
+    val part1DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart1), 
structType)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
part1DF)
+
+    // Wait for part1 expires.
+    Thread.sleep(60 * 1000)
+    val recordsForPart2 = 
DataSourceTestUtils.generateRandomRowsByPartition(100, "part2")
+    val recordsSeqForPart2 = convertRowListToSeq(recordsForPart2)
+    val part2DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart2), 
structType)
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
part2DF)
+
+    val timeline = 
HoodieTestUtils.createMetaClient(tempBasePath).getActiveTimeline
+    assert(timeline.getCompletedReplaceTimeline.getInstants.size() > 0)
+    val replaceInstant = 
timeline.getCompletedReplaceTimeline.getInstants.get(0)
+    val replaceMetadata = 
TimelineMetadataUtils.deserializeReplaceCommitMetadata(timeline.getInstantDetails(replaceInstant).get())
+    assert(replaceMetadata.getPartitionToReplaceFileIds.containsKey("part1"))
+  }
+
+}

Reply via email to