This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c20c01978b4 [HUDI-7685] Fix delete partition instant commit in
partition TTL (#11117)
c20c01978b4 is described below
commit c20c01978b4c1e7daf1e89de535dce85bebc0d93
Author: Manu <[email protected]>
AuthorDate: Wed May 1 16:45:27 2024 +0800
[HUDI-7685] Fix delete partition instant commit in partition TTL (#11117)
---
.../hudi/client/utils/DeletePartitionUtils.java | 4 +-
...eletePartitionPendingTableServiceException.java | 28 +++++++
.../ttl/strategy/KeepByCreationTimeStrategy.java | 2 +-
.../action/ttl/strategy/KeepByTimeStrategy.java | 2 +-
.../commit/SparkPartitionTTLActionExecutor.java | 15 +++-
.../apache/hudi/testutils/DataSourceTestUtils.java | 12 +++
.../TestHoodieSparkSqlWriterPartitionTTL.scala | 94 ++++++++++++++++++++++
7 files changed, 152 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
index 92c2065457e..73576efda26 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/DeletePartitionUtils.java
@@ -19,7 +19,7 @@
package org.apache.hudi.client.utils;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
-import org.apache.hudi.exception.HoodieDeletePartitionException;
+import
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
import org.apache.hudi.table.HoodieTable;
import java.util.ArrayList;
@@ -67,7 +67,7 @@ public class DeletePartitionUtils {
.forEach(x ->
instantsOfOffendingPendingTableServiceAction.add(x.getRight().getTimestamp()));
if (instantsOfOffendingPendingTableServiceAction.size() > 0) {
- throw new HoodieDeletePartitionException("Failed to drop partitions. "
+ throw new HoodieDeletePartitionPendingTableServiceException("Failed to
drop partitions. "
+ "Please ensure that there are no pending table service actions
(clustering/compaction) for the partitions to be deleted: " + partitionsToDrop
+ ". "
+ "Instant(s) of offending pending table service action: "
+
instantsOfOffendingPendingTableServiceAction.stream().distinct().collect(Collectors.toList()));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
new file mode 100644
index 00000000000..3e4e2574965
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieDeletePartitionPendingTableServiceException.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieDeletePartitionPendingTableServiceException extends
HoodieDeletePartitionException {
+
+ public HoodieDeletePartitionPendingTableServiceException(String msg) {
+ super(msg);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
index 038b82f6dc5..96b28b25c5c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByCreationTimeStrategy.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * KeepByTimeStrategy will return expired partitions by their lastCommitTime.
+ * KeepByTimeStrategy will return expired partitions by their create time.
*/
public class KeepByCreationTimeStrategy extends KeepByTimeStrategy {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
index b6d67bb9e8a..7051a9b59fb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/ttl/strategy/KeepByTimeStrategy.java
@@ -103,7 +103,7 @@ public class KeepByTimeStrategy extends
PartitionTTLStrategy {
* @param referenceTime last commit time or creation time for partition
*/
protected boolean isPartitionExpired(String referenceTime) {
- String expiredTime =
instantTimePlusMillis(fixInstantTimeCompatibility(referenceTime), ttlInMilis);
+ String expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis);
return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
index 166fc3672fa..b010325a8d4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkPartitionTTLActionExecutor.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
+import
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
public class SparkPartitionTTLActionExecutor<T>
@@ -47,11 +49,22 @@ public class SparkPartitionTTLActionExecutor<T>
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ HoodieWriteMetadata<HoodieData<WriteStatus>> emptyResult = new
HoodieWriteMetadata<>();
+ emptyResult.setPartitionToReplaceFileIds(Collections.emptyMap());
+ emptyResult.setWriteStatuses(context.emptyHoodieData());
try {
PartitionTTLStrategy strategy =
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(),
instantTime);
List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+ if (expiredPartitions.isEmpty()) {
+ return emptyResult;
+ }
LOG.info("Partition ttl find the following expired partitions to delete:
" + String.join(",", expiredPartitions));
- return new SparkDeletePartitionCommitActionExecutor<>(context, config,
table, instantTime, expiredPartitions).execute();
+ // Auto commit is disabled in config, copy config and enable auto commit
for SparkDeletePartitionCommitActionExecutor.
+ HoodieWriteConfig autoCommitConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps()).withAutoCommit(true).build();
+ return new SparkDeletePartitionCommitActionExecutor<>(context,
autoCommitConfig, table, instantTime, expiredPartitions).execute();
+ } catch (HoodieDeletePartitionPendingTableServiceException
deletePartitionPendingTableServiceException) {
+ LOG.info("Partition is under table service, do nothing, call delete
partition next time.");
+ return emptyResult;
} catch (IOException e) {
throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 10642318325..f00878069ee 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -74,6 +74,18 @@ public class DataSourceTestUtils {
return toReturn;
}
+ public static List<Row> generateRandomRowsByPartition(int count, String
partition) {
+ List<Row> toReturn = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ Object[] values = new Object[3];
+ values[0] =
HoodieTestDataGenerator.genPseudoRandomUUID(RANDOM).toString();
+ values[1] = partition;
+ values[2] = new Date().getTime();
+ toReturn.add(RowFactory.create(values));
+ }
+ return toReturn;
+ }
+
public static List<Row> generateUpdates(List<Row> records, int count) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
new file mode 100644
index 00000000000..00614339362
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
@@ -0,0 +1,94 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
+import org.apache.hudi.common.model.HoodieFileFormat
+import
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{fixInstantTimeCompatibility,
instantTimePlusMillis}
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.testutils.HoodieTestUtils
+import org.apache.hudi.config.{HoodieTTLConfig, HoodieWriteConfig}
+import org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Test
+import org.apache.hudi.table.HoodieTable
+
+
+/**
+ * The origin PartitionTTLStrategy calculate the expire time by DAYs, it's too
long for test.
+ * Override the method isPartitionExpired to calculate expire time by minutes.
+ * @param hoodieTable
+ * @param instantTime
+ */
+class HoodieSparkSqlWriterTestStrategy(hoodieTable: HoodieTable[_, _, _, _],
instantTime: String)
+ extends KeepByTimeStrategy(hoodieTable, instantTime) {
+ override def isPartitionExpired(referenceTime: String): Boolean = {
+ val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 /
60)
+ fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0
+ }
+}
+
+class TestHoodieSparkSqlWriterPartitionTTL extends HoodieSparkWriterTestBase {
+
+ /**
+ * Test partition ttl with HoodieSparkSqlWriter.
+ */
+ @Test
+ def testSparkSqlWriterWithPartitionTTL(): Unit = {
+ val hoodieFooTableName = "hoodie_foo_tbl"
+ val fooTableModifier = Map("path" -> tempBasePath,
+ HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ HoodieWriteConfig.BASE_FILE_FORMAT.key -> HoodieFileFormat.PARQUET.name,
+ DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+ HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "4",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
+ HoodieTTLConfig.INLINE_PARTITION_TTL.key() -> "true",
+ HoodieTTLConfig.DAYS_RETAIN.key() -> "1",
+ HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME.key() ->
"org.apache.hudi.HoodieSparkSqlWriterTestStrategy"
+ )
+
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val recordsForPart1 =
DataSourceTestUtils.generateRandomRowsByPartition(100, "part1")
+ val recordsSeqForPart1 = convertRowListToSeq(recordsForPart1)
+ val part1DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart1),
structType)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier,
part1DF)
+
+ // Wait for part1 expires.
+ Thread.sleep(60 * 1000)
+ val recordsForPart2 =
DataSourceTestUtils.generateRandomRowsByPartition(100, "part2")
+ val recordsSeqForPart2 = convertRowListToSeq(recordsForPart2)
+ val part2DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart2),
structType)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier,
part2DF)
+
+ val timeline =
HoodieTestUtils.createMetaClient(tempBasePath).getActiveTimeline
+ assert(timeline.getCompletedReplaceTimeline.getInstants.size() > 0)
+ val replaceInstant =
timeline.getCompletedReplaceTimeline.getInstants.get(0)
+ val replaceMetadata =
TimelineMetadataUtils.deserializeReplaceCommitMetadata(timeline.getInstantDetails(replaceInstant).get())
+ assert(replaceMetadata.getPartitionToReplaceFileIds.containsKey("part1"))
+ }
+
+}