This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cc81ddd [HUDI-1812] Add explicit index state TTL option for Flink
writer (#2853)
cc81ddd is described below
commit cc81ddde01d6c743708e68c7fb6394facca1e672
Author: hiscat <[email protected]>
AuthorDate: Wed Apr 21 20:13:30 2021 +0800
[HUDI-1812] Add explicit index state TTL option for Flink writer (#2853)
---
.../src/main/java/org/apache/hudi/configuration/FlinkOptions.java | 6 ++++++
.../org/apache/hudi/sink/partitioner/BucketAssignFunction.java | 7 ++++++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 9925fc7..3a942af 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -80,6 +80,12 @@ public class FlinkOptions {
.defaultValue(false)
.withDescription("Whether to bootstrap the index state from existing
hoodie table, default false");
+ public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
+ .key("index.state.ttl")
+ .doubleType()
+ .defaultValue(1.5D)
+ .withDescription("Index state ttl in days, default 1.5 day");
+
// ------------------------------------------------------------------------
// Read Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index f765e9d..79a3f44 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -37,17 +37,18 @@ import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.runtime.util.StateTtlConfigUtil;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -145,6 +146,10 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
"indexState",
TypeInformation.of(HoodieKey.class),
TypeInformation.of(HoodieRecordLocation.class));
+ double ttl = conf.getDouble(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 *
1000;
+ if (ttl > 0) {
+
indexStateDesc.enableTimeToLive(StateTtlConfigUtil.createTtlConfig((long) ttl));
+ }
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
if (bootstrapIndex) {
MapStateDescriptor<String, Integer> partitionLoadStateDesc =