This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fdb94192508 [HUDI-7715] Partition TTL for Flink (#11156)
fdb94192508 is described below
commit fdb94192508a3d76fdba63429d9b0df718316a7e
Author: Manu <[email protected]>
AuthorDate: Tue May 7 09:00:41 2024 +0800
[HUDI-7715] Partition TTL for Flink (#11156)
---
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 3 +-
.../commit/FlinkPartitionTTLActionExecutor.java | 73 ++++++++++++++++++
.../hudi/sink/TestWriterWithPartitionTTl.java | 89 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 8 ++
.../TestHoodieSparkSqlWriterPartitionTTL.scala | 4 +-
5 files changed, 173 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 1ea69d3a109..4fd217ce4bd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -61,6 +61,7 @@ import
org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.FlinkPartitionTTLActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
@@ -398,7 +399,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
@Override
public HoodieWriteMetadata<List<WriteStatus>>
managePartitionTTL(HoodieEngineContext context, String instantTime) {
- throw new HoodieNotSupportedException("Manage partition ttl is not
supported yet");
+ return new FlinkPartitionTTLActionExecutor(context, config, this,
instantTime).execute();
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
new file mode 100644
index 00000000000..f167fb5a916
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkPartitionTTLActionExecutor<T> extends
BaseFlinkCommitActionExecutor<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkPartitionTTLActionExecutor.class);
+
+ public FlinkPartitionTTLActionExecutor(HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable table,
+ String instantTime) {
+ super(context, null, config, table, instantTime,
WriteOperationType.DELETE_PARTITION);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ HoodieWriteMetadata<List<WriteStatus>> emptyResult = new
HoodieWriteMetadata<>();
+ emptyResult.setPartitionToReplaceFileIds(Collections.emptyMap());
+ emptyResult.setWriteStatuses(Collections.emptyList());
+ try {
+ PartitionTTLStrategy strategy =
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(),
instantTime);
+ List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+ if (expiredPartitions.isEmpty()) {
+ return emptyResult;
+ }
+ LOG.info("Partition ttl find the following expired partitions to delete:
" + String.join(",", expiredPartitions));
+ // Auto commit is disabled in config, copy config and enable auto commit
for FlinkDeletePartitionCommitActionExecutor.
+ HoodieWriteConfig autoCommitConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps()).withAutoCommit(true).build();
+ return new FlinkDeletePartitionCommitActionExecutor<>(context,
autoCommitConfig, table, instantTime, expiredPartitions).execute();
+ } catch (HoodieDeletePartitionPendingTableServiceException
deletePartitionPendingTableServiceException) {
+ LOG.info("Partition is under table service, do nothing, call delete
partition next time.");
+ return emptyResult;
+ } catch (IOException e) {
+ throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
new file mode 100644
index 00000000000..95e87b410ff
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestData;
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.fixInstantTimeCompatibility;
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimePlusMillis;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for partition TTL.
+ */
+public class TestWriterWithPartitionTTl extends TestWriteBase {
+ // The origin PartitionTTLStrategy calculate the expire time by DAYs, it's
too long for test.
+ // Override the method isPartitionExpired to calculate expire time by
minutes.
+ public static class FlinkPartitionTTLTestStrategy extends KeepByTimeStrategy
{
+ public FlinkPartitionTTLTestStrategy(HoodieTable hoodieTable, String
instantTime) {
+ super(hoodieTable, instantTime);
+ }
+
+ @Override
+ protected boolean isPartitionExpired(String referenceTime) {
+ String expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis /
24 / 3600);
+ return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) >
0;
+ }
+ }
+
+ @Override
+ protected void setUp(Configuration conf) {
+ conf.setBoolean(HoodieTTLConfig.INLINE_PARTITION_TTL.key(), true);
+ conf.setString(HoodieTTLConfig.DAYS_RETAIN.key(), "1");
+ conf.setString(HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME.key(),
FlinkPartitionTTLTestStrategy.class.getName());
+ }
+
+ @Test
+ public void testFlinkWriterWithPartitionTTL() throws Exception {
+ // open the function and ingest data
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART1)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .end();
+
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART2)
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .end();
+
+ HoodieActiveTimeline timeline =
StreamerUtil.createMetaClient(conf).getActiveTimeline();
+ assertTrue(timeline.getCompletedReplaceTimeline().getInstants().size() >
0);
+ HoodieInstant replaceCommit =
timeline.getCompletedReplaceTimeline().getInstants().get(0);
+ HoodieReplaceCommitMetadata commitMetadata =
TimelineMetadataUtils.deserializeReplaceCommitMetadata(timeline.getInstantDetails(replaceCommit).get());
+
assertTrue(commitMetadata.getPartitionToReplaceFileIds().containsKey("par1"));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index fd74cf07e05..7b14614cd37 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -351,6 +351,14 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+ public static List<RowData> DATA_SET_PART1 = Collections.singletonList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
+ public static List<RowData> DATA_SET_PART2 = Collections.singletonList(
+ insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
+ TimestampData.fromEpochMillis(1), StringData.fromString("par2")));
+
public static List<RowData> DATA_SET_SINGLE_DELETE =
Collections.singletonList(
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
index 00614339362..495f9d39413 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
@@ -43,7 +43,7 @@ import org.apache.hudi.table.HoodieTable
class HoodieSparkSqlWriterTestStrategy(hoodieTable: HoodieTable[_, _, _, _],
instantTime: String)
extends KeepByTimeStrategy(hoodieTable, instantTime) {
override def isPartitionExpired(referenceTime: String): Boolean = {
- val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 /
60)
+ val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 /
3600)
fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0
}
}
@@ -77,8 +77,6 @@ class TestHoodieSparkSqlWriterPartitionTTL extends
HoodieSparkWriterTestBase {
val part1DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart1),
structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier,
part1DF)
- // Wait for part1 expires.
- Thread.sleep(60 * 1000)
val recordsForPart2 =
DataSourceTestUtils.generateRandomRowsByPartition(100, "part2")
val recordsSeqForPart2 = convertRowListToSeq(recordsForPart2)
val part2DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart2),
structType)