rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r647769972
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
##########
@@ -70,11 +72,26 @@ public KeyGroupRange getKeyGroupRange() {
@Nullable
@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
- throw new UnsupportedOperationException();
+ return
changes.stream().mapToInt(StateChange::getKeyGroup).anyMatch(keyGroupRange::contains)
+ ? this
+ : null;
}
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public String toString() {
+ return String.format("from %s to %s: %s", from, to, changes);
+ }
+
+ public long getFrom() {
+ return ((SequenceNumber.GenericSequenceNumber) from).number;
Review comment:
The idea is to be have different implementations of the changelog,
potentially with different sequencing. For example, once could include an epoch
and a number inside that epoch.
Adding `long number()` to `SequenceNumber` would expose it's implementation
detail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]