Myasuka commented on a change in pull request #19051:
URL: https://github.com/apache/flink/pull/19051#discussion_r826716970
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -145,9 +145,17 @@ protected void log(
@Nullable ThrowingConsumer<DataOutputViewStreamWrapper,
IOException> dataWriter,
Ns ns)
throws IOException {
+ log(op, dataWriter, ns, keyContext.getCurrentKeyGroupIndex());
+ }
+
+ protected void log(
+ StateChangeOperation op,
+ @Nullable ThrowingConsumer<DataOutputViewStreamWrapper,
IOException> dataWriter,
+ Ns ns,
+ int keyGroup)
Review comment:
> 1. There could be other places where eventTimeTimersQueue.poll() or
remove is called. So it must be a (javadoc) contract that setCurrentKey is
required before poll
As `processingTimeTimersQueue` is only used internally, I think refining the
doc and description is okay .
> 2. Following the above contract, setCurrentKey should also be called on
recovery (by PriorityQueueStateChangeApplier - similar to KvStateChangeApplier)
I think we don't need to set the current key on recovery, the root cause of
this problem is that it did not record the correct key group on logging. And we
actually could filter the keygroup out of current range in [condition
check](https://github.com/apache/flink/blob/a0d31c5e0914d8e759917a72ca7b667d3db2f1d2/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java#L95).
However, as it did not record the corret key group, some wrong data could pass
the filter then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]