This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3db4220f5c8730e065734cff16237c7743b390f
Author: Jark Wu <[email protected]>
AuthorDate: Tue Jan 19 19:27:39 2021 +0800

    [hotfix][table-runtime-blink] Rename StateTtlConfigUtil to StateConfigUtil
---
 .../table/runtime/operators/aggregate/GroupAggFunction.java  |  2 +-
 .../runtime/operators/aggregate/GroupTableAggFunction.java   |  2 +-
 .../operators/aggregate/MiniBatchGlobalGroupAggFunction.java |  2 +-
 .../operators/aggregate/MiniBatchGroupAggFunction.java       |  2 +-
 .../aggregate/MiniBatchIncrementalGroupAggFunction.java      |  2 +-
 .../operators/deduplicate/DeduplicateFunctionBase.java       |  2 +-
 .../deduplicate/MiniBatchDeduplicateFunctionBase.java        |  2 +-
 .../operators/join/stream/state/JoinRecordStateViews.java    |  2 +-
 .../join/stream/state/OuterJoinRecordStateViews.java         |  2 +-
 .../util/{StateTtlConfigUtil.java => StateConfigUtil.java}   | 12 +++++++++++-
 10 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
index d6dd6d2..716413d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java
@@ -37,7 +37,7 @@ import org.apache.flink.util.Collector;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
 import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg;
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /** Aggregate Function used for the groupby (without window) aggregate. */
 public class GroupAggFunction extends KeyedProcessFunction<RowData, RowData, 
RowData> {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java
index 8911b18..e951e2f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupTableAggFunction.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Collector;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /** Aggregate Function used for the groupby (without window) table aggregate. 
*/
 public class GroupTableAggFunction extends KeyedProcessFunction<RowData, 
RowData, RowData> {
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
index 3de0037..8832498 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
@@ -39,7 +39,7 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /** Aggregate Function used for the global groupby (without window) aggregate 
in miniBatch mode. */
 public class MiniBatchGlobalGroupAggFunction
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
index f0dce9d..fcc5b82 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java
@@ -47,7 +47,7 @@ import java.util.Map;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
 import static org.apache.flink.table.data.util.RowDataUtil.isRetractMsg;
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
  * Aggregate Function used for the groupby (without window) aggregate in 
miniBatch mode.
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java
index a61a7f3..8030b27 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchIncrementalGroupAggFunction.java
@@ -34,7 +34,7 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
  * Aggregate Function used for the incremental groupby (without window) 
aggregate in miniBatch mode.
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
index 6a02758..fa6b04a 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
  * Base class for deduplicate function.
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
index 474bab5..fe4c0e2 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/MiniBatchDeduplicateFunctionBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.runtime.context.ExecutionContext;
 import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
  * Base class for miniBatch deduplicate function.
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java
index 2f4c833..3c8280c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java
@@ -35,7 +35,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Utility to create a {@link JoinRecordStateView} depends on {@link 
JoinInputSideSpec}. */
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java
index 178d0c6..a88d5d2 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/OuterJoinRecordStateViews.java
@@ -37,7 +37,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.util.StateTtlConfigUtil.createTtlConfig;
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Utility to create a {@link OuterJoinRecordStateViews} depends on {@link 
JoinInputSideSpec}. */
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
similarity index 74%
rename from 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java
rename to 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
index 98d6301..a27a09d 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
@@ -20,9 +20,13 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 
 /** Utility to create a {@link StateTtlConfig} object. */
-public class StateTtlConfigUtil {
+public class StateConfigUtil {
+
+    private static final String ROCKSDB_KEYED_STATE_BACKEDN =
+            
"org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend";
 
     /**
      * Creates a {@link StateTtlConfig} depends on retentionTime parameter.
@@ -39,4 +43,10 @@ public class StateTtlConfigUtil {
             return StateTtlConfig.DISABLED;
         }
     }
+
+    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
stateBackend) {
+        // TODO: remove this once FLINK-21027 is supported
+        // state key and value is immutable only when using rocksdb state 
backend
+        return 
ROCKSDB_KEYED_STATE_BACKEDN.equals(stateBackend.getClass().getCanonicalName());
+    }
 }

Reply via email to