rkhachatryan commented on a change in pull request #15200:
URL: https://github.com/apache/flink/pull/15200#discussion_r640991144



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
##########
@@ -67,18 +71,20 @@ public OUT get() throws Exception {
     @Override
     public void add(IN value) throws Exception {
         delegatedState.add(value);
+        changeLogger.stateUpdated(delegatedState.getInternal(), 
getCurrentNamespace());

Review comment:
       We don't know how the accumulator was affected by the added value, so we 
have to log it (using `logger.stateUpdated(delegated.getInternal())`).
   And as we log the accumulator state, there is no need to log the added value.
   
   Accumulator can be either bigger or smaller than the added value (e.g. list 
and counter). But we don't know it and can't log only the added value (above 
reason).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to