Myasuka commented on a change in pull request #19051:
URL: https://github.com/apache/flink/pull/19051#discussion_r826716970



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -145,9 +145,17 @@ protected void log(
             @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, 
IOException> dataWriter,
             Ns ns)
             throws IOException {
+        log(op, dataWriter, ns, keyContext.getCurrentKeyGroupIndex());
+    }
+
+    protected void log(
+            StateChangeOperation op,
+            @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, 
IOException> dataWriter,
+            Ns ns,
+            int keyGroup)

Review comment:
       > 1. There could be other places where eventTimeTimersQueue.poll() or 
remove is called. So it must be a (javadoc) contract that setCurrentKey is 
required before poll
   
   As `processingTimeTimersQueue` is only used internally, I think refining the 
doc and description is okay .
   
   > 2. Following the above contract, setCurrentKey should also be called on 
recovery (by PriorityQueueStateChangeApplier - similar to KvStateChangeApplier)
   
   I think we don't need to set the current key on recovery, the root cause of 
this problem is that it did not record the correct key group on logging. And we 
actually could filter the keygroup out of current range in [condition 
check](https://github.com/apache/flink/blob/a0d31c5e0914d8e759917a72ca7b667d3db2f1d2/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java#L95).
 However, as it did not record the corret key group, some wrong data could pass 
the filter then.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to