rkhachatryan commented on a change in pull request #15200:
URL: https://github.com/apache/flink/pull/15200#discussion_r647574512



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
##########
@@ -39,53 +44,83 @@
         extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, 
V>>
         implements InternalListState<K, N, V> {
 
-    ChangelogListState(InternalListState<K, N, V> delegatedState) {
-        super(delegatedState);
+    ChangelogListState(
+            InternalListState<K, N, V> delegatedState,
+            KvStateChangeLogger<List<V>, N> changeLogger) {
+        super(delegatedState, changeLogger);
     }
 
     @Override
     public void update(List<V> values) throws Exception {
+        changeLogger.stateUpdated(values, getCurrentNamespace());
         delegatedState.update(values);
     }
 
     @Override
     public void addAll(List<V> values) throws Exception {
+        changeLogger.stateAdded(values, getCurrentNamespace());
         delegatedState.addAll(values);
     }
 
     @Override
     public void updateInternal(List<V> valueToStore) throws Exception {
+        changeLogger.stateUpdated(valueToStore, getCurrentNamespace());
         delegatedState.updateInternal(valueToStore);
     }

Review comment:
       TLDR: adding a new operation `SET_INTERNAL`
   
   As discussed offline: `update` treats the provided value as user-supplied 
and so it performs some checks and protective copying (e.g. for heap state 
backend).
   
   If left as is, then on recovery a different method might be called 
(`updateInternal` instead of `update` or vice versa). It probably won't cause 
any issues for now, but I'll separate these operations to be on the safe side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to