Myasuka commented on a change in pull request #16404:
URL: https://github.com/apache/flink/pull/16404#discussion_r665319037



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
##########
@@ -107,16 +112,36 @@ private static void applyMetaDataChange(
         }
     }
 
+    private static Optional<StateTtlConfig> readTtlConfig(DataInputView in) 
throws IOException {

Review comment:
       Since `StateTtlConfig` has the meaning of enabled or disabled by itself, 
why we need to introduce an `Optional` here? If you want to reduce the writing 
content during serialization, you can just return a `StateTtlConfig.DISABLED` 
here if read false.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
##########
@@ -137,32 +139,41 @@ private IS createState() throws Exception {
     @SuppressWarnings("unchecked")
     private IS createValueState() throws Exception {
         ValueStateDescriptor<TtlValue<SV>> ttlDescriptor =
-                new ValueStateDescriptor<>(
-                        stateDesc.getName(),
-                        new TtlSerializer<>(LongSerializer.INSTANCE, 
stateDesc.getSerializer()));
+                stateDesc.getSerializer() instanceof TtlSerializer

Review comment:
       If not introducing changelog state backend, will this problem still 
existed?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -124,6 +124,9 @@
      */
     private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
 
+    /** Unwrapped changelog states used for recovery (not wrapped into e.g. 
TTL). */
+    private final HashMap<String, ChangelogState> changelogStates;

Review comment:
       When will the `changelogStates` be cleared?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -533,7 +541,7 @@ public ChangelogState getExistingState(String name, 
BackendStateType type)
         ChangelogState state;
         switch (type) {
             case KEY_VALUE:
-                state = (ChangelogState) keyValueStatesByName.get(name);
+                state = changelogStates.get(name);

Review comment:
       If I understand correctly, the `changelogStates` contains state which is 
already TTL state if previous TTL is enabled and having the TTL serializer?
   
   I think this logic is a bit complex and we should better add some 
descriptions in the java doc. Moreover, how about rename the name 
`getExistingState` to like `getRestoredDelegatedState` to make the semantics 
more clean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to