rkhachatryan commented on a change in pull request #15200:
URL: https://github.com/apache/flink/pull/15200#discussion_r640996532
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
##########
@@ -18,24 +18,36 @@
package org.apache.flink.state.changelog;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collection;
import java.util.Set;
+import static
org.apache.flink.state.changelog.StateChangeLogger.loggingIterator;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link KeyGroupedInternalPriorityQueue} that keeps state on the
underlying delegated {@link
* KeyGroupedInternalPriorityQueue} as well as on the state change log.
*/
public class ChangelogKeyGroupedPriorityQueue<T> implements
KeyGroupedInternalPriorityQueue<T> {
private final KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
+ private final PqStateChangeLogger<T> logger;
Review comment:
State type is written as part of the metadata (and for each state
change, state name is written, so we can relate it to the metadata).
The current key group is written in `AbstractStateChangeLogger.log`, so we
can differentiate between the groups on rescaling.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]