curcur commented on a change in pull request #14943:
URL: https://github.com/apache/flink/pull/14943#discussion_r584023097



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
##########
@@ -46,19 +46,24 @@ public static StateTtlConfig createTtlConfig(long 
retentionTime) {
         }
     }
 
-    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
stateBackend) {
+    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
keyedStateBackend) {
         // TODO: remove the hard code check once FLINK-21027 is supported
         // state key and value is immutable only when using rocksdb state 
backend and timer
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof DelegateKeyedStateBackend
+                        ? ((DelegateKeyedStateBackend<?>) keyedStateBackend)
+                                .getDelegatedKeyedStateBackend()
+                        : keyedStateBackend;
+
         boolean isRocksDbState =
-                
ROCKSDB_KEYED_STATE_BACKEND.equals(stateBackend.getClass().getCanonicalName());
-        boolean isHeapTimer = false;
-        if (stateBackend instanceof AbstractKeyedStateBackend) {
-            // currently, requiresLegacySynchronousTimerSnapshots()
-            // indicates the underlying uses heap-bsased timer
-            isHeapTimer =
-                    ((AbstractKeyedStateBackend<?>) stateBackend)
-                            
.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);
-        }
+                ROCKSDB_KEYED_STATE_BACKEND.equals(
+                        rootKeyedStateBackend.getClass().getCanonicalName());

Review comment:
       The returned logic of isStateImmutableInStateBackend() is not the same 
as what requiresLegacySynchronousTimerSnapshots() returns, please check my 
highlights of the last comment.
   
   `requiresLegacySynchronousTimerSnapshots() == false` only indicates it does 
not use a heap timer; it does not indicate it is a rocksdb statebackend.
   
   the isStateImmutableInStateBackend() needs 
requiresLegacySynchronousTimerSnapshots() == false && uses rocks db




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to