This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc81ddd  [HUDI-1812] Add explicit index state TTL option for Flink 
writer (#2853)
cc81ddd is described below

commit cc81ddde01d6c743708e68c7fb6394facca1e672
Author: hiscat <[email protected]>
AuthorDate: Wed Apr 21 20:13:30 2021 +0800

    [HUDI-1812] Add explicit index state TTL option for Flink writer (#2853)
---
 .../src/main/java/org/apache/hudi/configuration/FlinkOptions.java  | 6 ++++++
 .../org/apache/hudi/sink/partitioner/BucketAssignFunction.java     | 7 ++++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9925fc7..3a942af 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -80,6 +80,12 @@ public class FlinkOptions {
       .defaultValue(false)
       .withDescription("Whether to bootstrap the index state from existing 
hoodie table, default false");
 
+  public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
+      .key("index.state.ttl")
+      .doubleType()
+      .defaultValue(1.5D)
+      .withDescription("Index state ttl in days, default 1.5 day");
+
   // ------------------------------------------------------------------------
   //  Read Options
   // ------------------------------------------------------------------------
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index f765e9d..79a3f44 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -37,17 +37,18 @@ import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -145,6 +146,10 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
             "indexState",
             TypeInformation.of(HoodieKey.class),
             TypeInformation.of(HoodieRecordLocation.class));
+    double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 
1000;
+    if (ttl > 0) {
+      
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
+    }
     indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
     if (bootstrapIndex) {
       MapStateDescriptor<String, Integer> partitionLoadStateDesc =

Reply via email to