This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f3db4220f5c8730e065734cff16237c7743b390f Author: Jark Wu <[email protected]> AuthorDate: Tue Jan 19 19:27:39 2021 +0800 [hotfix][table-runtime-blink] Rename StateTtlConfigUtil to StateConfigUtil --- .../table/runtime/operators/aggregate/GroupAggFunction.java | 2 +- .../runtime/operators/aggregate/GroupTableAggFunction.java | 2 +- .../operators/aggregate/MiniBatchGlobalGroupAggFunction.java | 2 +- .../operators/aggregate/MiniBatchGroupAggFunction.java | 2 +- .../aggregate/MiniBatchIncrementalGroupAggFunction.java | 2 +- .../operators/deduplicate/DeduplicateFunctionBase.java | 2 +- .../deduplicate/MiniBatchDeduplicateFunctionBase.java | 2 +- .../operators/join/stream/state/JoinRecordStateViews.java | 2 +- .../join/stream/state/OuterJoinRecordStateViews.java | 2 +- .../util/{StateTtlConfigUtil.java => StateConfigUtil.java} | 12 +++++++++++- 10 files changed, 20 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java index d6dd6d2..716413d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java @@ -37,7 +37,7 @@ import org.apache.flink.util.Collector; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** Aggregate Function used for the groupby (without window) aggregate. */ public class GroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java index 8911b18..e951e2f 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java @@ -32,7 +32,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.Collector; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** Aggregate Function used for the groupby (without window) table aggregate. */ public class GroupTableAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java index 3de0037..8832498 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java @@ -39,7 +39,7 @@ import javax.annotation.Nullable; import java.util.Map; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** Aggregate Function used for the global groupby (without window) aggregate in miniBatch mode. */ public class MiniBatchGlobalGroupAggFunction diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java index f0dce9d..fcc5b82 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java @@ -47,7 +47,7 @@ import java.util.Map; import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg; import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** * Aggregate Function used for the groupby (without window) aggregate in miniBatch mode. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java index a61a7f3..8030b27 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java @@ -34,7 +34,7 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** * Aggregate Function used for the incremental groupby (without window) aggregate in miniBatch mode. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java index 6a02758..fa6b04a 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** * Base class for deduplicate function. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java index 474bab5..fe4c0e2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.runtime.context.ExecutionContext; import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** * Base class for miniBatch deduplicate function. diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java index 2f4c833..3c8280c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java @@ -35,7 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; import static org.apache.flink.util.Preconditions.checkNotNull; /** Utility to create a {@link JoinRecordStateView} depends on {@link JoinInputSideSpec}. */ diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java index 178d0c6..a88d5d2 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java @@ -37,7 +37,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import static org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig; +import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; import static org.apache.flink.util.Preconditions.checkNotNull; /** Utility to create a {@link OuterJoinRecordStateViews} depends on {@link JoinInputSideSpec}. */ diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java similarity index 74% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java index 98d6301..a27a09d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java @@ -20,9 +20,13 @@ package org.apache.flink.table.runtime.util; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.state.KeyedStateBackend; /** Utility to create a {@link StateTtlConfig} object. */ -public class StateTtlConfigUtil { +public class StateConfigUtil { + + private static final String ROCKSDB_KEYED_STATE_BACKEDN = + "org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend"; /** * Creates a {@link StateTtlConfig} depends on retentionTime parameter. @@ -39,4 +43,10 @@ public class StateTtlConfigUtil { return StateTtlConfig.DISABLED; } } + + public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> stateBackend) { + // TODO: remove this once FLINK-21027 is supported + // state key and value is immutable only when using rocksdb state backend + return ROCKSDB_KEYED_STATE_BACKEDN.equals(stateBackend.getClass().getCanonicalName()); + } }
