This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fdb94192508 [HUDI-7715] Partition TTL for Flink (#11156)
fdb94192508 is described below

commit fdb94192508a3d76fdba63429d9b0df718316a7e
Author: Manu <[email protected]>
AuthorDate: Tue May 7 09:00:41 2024 +0800

    [HUDI-7715] Partition TTL for Flink (#11156)
---
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  3 +-
 .../commit/FlinkPartitionTTLActionExecutor.java    | 73 ++++++++++++++++++
 .../hudi/sink/TestWriterWithPartitionTTl.java      | 89 ++++++++++++++++++++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  8 ++
 .../TestHoodieSparkSqlWriterPartitionTTL.scala     |  4 +-
 5 files changed, 173 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 1ea69d3a109..4fd217ce4bd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -61,6 +61,7 @@ import 
org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.FlinkPartitionTTLActionExecutor;
 import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
 import 
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
 import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
@@ -398,7 +399,7 @@ public class HoodieFlinkCopyOnWriteTable<T>
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> 
managePartitionTTL(HoodieEngineContext context, String instantTime) {
-    throw new HoodieNotSupportedException("Manage partition ttl is not 
supported yet");
+    return new FlinkPartitionTTLActionExecutor(context, config, this, 
instantTime).execute();
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
new file mode 100644
index 00000000000..f167fb5a916
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkPartitionTTLActionExecutor.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieWriteConfig;
+import 
org.apache.hudi.exception.HoodieDeletePartitionPendingTableServiceException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.ttl.strategy.HoodiePartitionTTLStrategyFactory;
+import org.apache.hudi.table.action.ttl.strategy.PartitionTTLStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class FlinkPartitionTTLActionExecutor<T> extends 
BaseFlinkCommitActionExecutor<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPartitionTTLActionExecutor.class);
+
+  public FlinkPartitionTTLActionExecutor(HoodieEngineContext context,
+                                         HoodieWriteConfig config,
+                                         HoodieTable table,
+                                         String instantTime) {
+    super(context, null, config, table, instantTime, 
WriteOperationType.DELETE_PARTITION);
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute() {
+    HoodieWriteMetadata<List<WriteStatus>> emptyResult = new 
HoodieWriteMetadata<>();
+    emptyResult.setPartitionToReplaceFileIds(Collections.emptyMap());
+    emptyResult.setWriteStatuses(Collections.emptyList());
+    try {
+      PartitionTTLStrategy strategy = 
HoodiePartitionTTLStrategyFactory.createStrategy(table, config.getProps(), 
instantTime);
+      List<String> expiredPartitions = strategy.getExpiredPartitionPaths();
+      if (expiredPartitions.isEmpty()) {
+        return emptyResult;
+      }
+      LOG.info("Partition ttl find the following expired partitions to delete: 
 " + String.join(",", expiredPartitions));
+      // Auto commit is disabled in config, copy config and enable auto commit 
for FlinkDeletePartitionCommitActionExecutor.
+      HoodieWriteConfig autoCommitConfig = 
HoodieWriteConfig.newBuilder().withProperties(config.getProps()).withAutoCommit(true).build();
+      return new FlinkDeletePartitionCommitActionExecutor<>(context, 
autoCommitConfig, table, instantTime, expiredPartitions).execute();
+    } catch (HoodieDeletePartitionPendingTableServiceException 
deletePartitionPendingTableServiceException) {
+      LOG.info("Partition is under table service, do nothing, call delete 
partition next time.");
+      return emptyResult;
+    } catch (IOException e) {
+      throw new HoodieIOException("Error executing hoodie partition ttl: ", e);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
new file mode 100644
index 00000000000..95e87b410ff
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriterWithPartitionTTl.java
@@ -0,0 +1,89 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.config.HoodieTTLConfig;
+import org.apache.hudi.sink.utils.TestWriteBase;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.ttl.strategy.KeepByTimeStrategy;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestData;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.fixInstantTimeCompatibility;
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimePlusMillis;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for partition TTL.
+ */
+public class TestWriterWithPartitionTTl extends TestWriteBase {
+  // The origin PartitionTTLStrategy calculate the expire time by DAYs, it's 
too long for test.
+  // Override the method isPartitionExpired to calculate expire time by 
minutes.
+  public static class FlinkPartitionTTLTestStrategy extends KeepByTimeStrategy 
{
+    public FlinkPartitionTTLTestStrategy(HoodieTable hoodieTable, String 
instantTime) {
+      super(hoodieTable, instantTime);
+    }
+
+    @Override
+    protected boolean isPartitionExpired(String referenceTime) {
+      String expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 
24 / 3600);
+      return fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 
0;
+    }
+  }
+
+  @Override
+  protected void setUp(Configuration conf) {
+    conf.setBoolean(HoodieTTLConfig.INLINE_PARTITION_TTL.key(), true);
+    conf.setString(HoodieTTLConfig.DAYS_RETAIN.key(), "1");
+    conf.setString(HoodieTTLConfig.PARTITION_TTL_STRATEGY_CLASS_NAME.key(), 
FlinkPartitionTTLTestStrategy.class.getName());
+  }
+
+  @Test
+  public void testFlinkWriterWithPartitionTTL() throws Exception {
+    // open the function and ingest data
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_PART1)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .end();
+
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_PART2)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .end();
+
+    HoodieActiveTimeline timeline = 
StreamerUtil.createMetaClient(conf).getActiveTimeline();
+    assertTrue(timeline.getCompletedReplaceTimeline().getInstants().size() > 
0);
+    HoodieInstant replaceCommit = 
timeline.getCompletedReplaceTimeline().getInstants().get(0);
+    HoodieReplaceCommitMetadata commitMetadata = 
TimelineMetadataUtils.deserializeReplaceCommitMetadata(timeline.getInstantDetails(replaceCommit).get());
+    
assertTrue(commitMetadata.getPartitionToReplaceFileIds().containsKey("par1"));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index fd74cf07e05..7b14614cd37 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -351,6 +351,14 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
 
+  public static List<RowData> DATA_SET_PART1 = Collections.singletonList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+
+  public static List<RowData> DATA_SET_PART2 = Collections.singletonList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par2")));
+
   public static List<RowData> DATA_SET_SINGLE_DELETE = 
Collections.singletonList(
       deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(5), StringData.fromString("par1")));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
index 00614339362..495f9d39413 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterPartitionTTL.scala
@@ -43,7 +43,7 @@ import org.apache.hudi.table.HoodieTable
 class HoodieSparkSqlWriterTestStrategy(hoodieTable: HoodieTable[_, _, _, _], 
instantTime: String)
   extends KeepByTimeStrategy(hoodieTable, instantTime) {
   override def isPartitionExpired(referenceTime: String): Boolean = {
-    val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 / 
60)
+    val expiredTime = instantTimePlusMillis(referenceTime, ttlInMilis / 24 / 
3600)
     fixInstantTimeCompatibility(instantTime).compareTo(expiredTime) > 0
   }
 }
@@ -77,8 +77,6 @@ class TestHoodieSparkSqlWriterPartitionTTL extends 
HoodieSparkWriterTestBase {
     val part1DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart1), 
structType)
     HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
part1DF)
 
-    // Wait for part1 expires.
-    Thread.sleep(60 * 1000)
     val recordsForPart2 = 
DataSourceTestUtils.generateRandomRowsByPartition(100, "part2")
     val recordsSeqForPart2 = convertRowListToSeq(recordsForPart2)
     val part2DF = spark.createDataFrame(sc.parallelize(recordsSeqForPart2), 
structType)

Reply via email to