This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9233040e1c09b7cf631c710169bbaca68b654503
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jun 24 16:05:39 2021 +0200

    [hotfix][state/changelog] Implement 
InMemoryChangelogStateHandle.getKeyGroupRange
---
 .../checkpoint/metadata/MetadataV2V3SerializerBase.java    |  7 ++++++-
 .../changelog/inmemory/InMemoryStateChangelogHandle.java   | 14 ++++++++++----
 .../changelog/inmemory/InMemoryStateChangelogWriter.java   | 10 +++++++++-
 .../inmemory/InMemoryStateChangelogWriterFactory.java      |  2 +-
 4 files changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 886ab37..96961ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -318,6 +318,8 @@ public abstract class MetadataV2V3SerializerBase {
         } else if (stateHandle instanceof InMemoryStateChangelogHandle) {
             InMemoryStateChangelogHandle handle = 
(InMemoryStateChangelogHandle) stateHandle;
             dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE);
+            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
+            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
             dos.writeLong(handle.getFrom());
             dos.writeLong(handle.getTo());
             List<StateChange> list = new ArrayList<>();
@@ -406,6 +408,9 @@ public abstract class MetadataV2V3SerializerBase {
                     metaDataStateHandle);
 
         } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) {
+            int start = dis.readInt();
+            int numKeyGroups = dis.readInt();
+            KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + 
numKeyGroups - 1);
             long from = dis.readLong();
             long to = dis.readLong();
             int size = dis.readInt();
@@ -417,7 +422,7 @@ public abstract class MetadataV2V3SerializerBase {
                 dis.read(bytes);
                 changes.add(new StateChange(keyGroup, bytes));
             }
-            return new InMemoryStateChangelogHandle(changes, from, to);
+            return new InMemoryStateChangelogHandle(changes, from, to, 
keyGroupRange);
 
         } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) {
             int start = dis.readInt();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
index e71cac2..0896dc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -39,16 +39,22 @@ public class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void>
     private final List<StateChange> changes;
     private final SequenceNumber from; // for debug purposes
     private final SequenceNumber to; // for debug purposes
+    private final KeyGroupRange keyGroupRange;
 
-    public InMemoryStateChangelogHandle(List<StateChange> changes, long from, 
long to) {
-        this(changes, SequenceNumber.of(from), SequenceNumber.of(to));
+    public InMemoryStateChangelogHandle(
+            List<StateChange> changes, long from, long to, KeyGroupRange 
keyGroupRange) {
+        this(changes, SequenceNumber.of(from), SequenceNumber.of(to), 
keyGroupRange);
     }
 
     public InMemoryStateChangelogHandle(
-            List<StateChange> changes, SequenceNumber from, SequenceNumber to) 
{
+            List<StateChange> changes,
+            SequenceNumber from,
+            SequenceNumber to,
+            KeyGroupRange keyGroupRange) {
         this.changes = changes;
         this.from = from;
         this.to = to;
+        this.keyGroupRange = keyGroupRange;
     }
 
     @Override
@@ -66,7 +72,7 @@ public class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void>
 
     @Override
     public KeyGroupRange getKeyGroupRange() {
-        throw new UnsupportedOperationException();
+        return keyGroupRange;
     }
 
     @Nullable
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index cb5e953..fa99f69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -46,9 +47,14 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
 
     private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> 
changesByKeyGroup =
             new HashMap<>();
+    private final KeyGroupRange keyGroupRange;
     private long sqn = 0L;
     private boolean closed;
 
+    public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
+        this.keyGroupRange = keyGroupRange;
+    }
+
     @Override
     public void append(int keyGroup, byte[] value) {
         Preconditions.checkState(!closed, "LogWriter is closed");
@@ -67,7 +73,9 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
     public CompletableFuture<InMemoryStateChangelogHandle> 
persist(SequenceNumber from) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
-        return completedFuture(new 
InMemoryStateChangelogHandle(collectChanges(from), from, 
SequenceNumber.of(sqn)));
+        return completedFuture(
+                new InMemoryStateChangelogHandle(
+                        collectChanges(from), from, SequenceNumber.of(sqn), 
keyGroupRange));
     }
 
     private List<StateChange> collectChanges(SequenceNumber after) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
index 61f5109..255de2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
@@ -27,6 +27,6 @@ public class InMemoryStateChangelogWriterFactory
     @Override
     public InMemoryStateChangelogWriter createWriter(
             String operatorID, KeyGroupRange keyGroupRange) {
-        return new InMemoryStateChangelogWriter();
+        return new InMemoryStateChangelogWriter(keyGroupRange);
     }
 }

Reply via email to