rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r647769972



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
##########
@@ -70,11 +72,26 @@ public KeyGroupRange getKeyGroupRange() {
     @Nullable
     @Override
     public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-        throw new UnsupportedOperationException();
+        return 
changes.stream().mapToInt(StateChange::getKeyGroup).anyMatch(keyGroupRange::contains)
+                ? this
+                : null;
     }
 
     @Override
     public void registerSharedStates(SharedStateRegistry stateRegistry) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public String toString() {
+        return String.format("from %s to %s: %s", from, to, changes);
+    }
+
+    public long getFrom() {
+        return ((SequenceNumber.GenericSequenceNumber) from).number;

Review comment:
       The idea is to be have different implementations of the changelog, 
potentially with different sequencing. For example, once could include an epoch 
and a number inside that epoch.
   
   Adding `long number()` to `SequenceNumber` would expose it's implementation 
detail. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to