This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9233040e1c09b7cf631c710169bbaca68b654503 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Jun 24 16:05:39 2021 +0200 [hotfix][state/changelog] Implement InMemoryChangelogStateHandle.getKeyGroupRange --- .../checkpoint/metadata/MetadataV2V3SerializerBase.java | 7 ++++++- .../changelog/inmemory/InMemoryStateChangelogHandle.java | 14 ++++++++++---- .../changelog/inmemory/InMemoryStateChangelogWriter.java | 10 +++++++++- .../inmemory/InMemoryStateChangelogWriterFactory.java | 2 +- 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 886ab37..96961ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -318,6 +318,8 @@ public abstract class MetadataV2V3SerializerBase { } else if (stateHandle instanceof InMemoryStateChangelogHandle) { InMemoryStateChangelogHandle handle = (InMemoryStateChangelogHandle) stateHandle; dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE); + dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); dos.writeLong(handle.getFrom()); dos.writeLong(handle.getTo()); List<StateChange> list = new ArrayList<>(); @@ -406,6 +408,9 @@ public abstract class MetadataV2V3SerializerBase { metaDataStateHandle); } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) { + int start = dis.readInt(); + int numKeyGroups = dis.readInt(); + KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + numKeyGroups - 1); long from = dis.readLong(); long to = dis.readLong(); int size = dis.readInt(); @@ -417,7 +422,7 @@ public abstract class MetadataV2V3SerializerBase { dis.read(bytes); changes.add(new StateChange(keyGroup, bytes)); } - return new InMemoryStateChangelogHandle(changes, from, to); + return new InMemoryStateChangelogHandle(changes, from, to, keyGroupRange); } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) { int start = dis.readInt(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java index e71cac2..0896dc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java @@ -39,16 +39,22 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> private final List<StateChange> changes; private final SequenceNumber from; // for debug purposes private final SequenceNumber to; // for debug purposes + private final KeyGroupRange keyGroupRange; - public InMemoryStateChangelogHandle(List<StateChange> changes, long from, long to) { - this(changes, SequenceNumber.of(from), SequenceNumber.of(to)); + public InMemoryStateChangelogHandle( + List<StateChange> changes, long from, long to, KeyGroupRange keyGroupRange) { + this(changes, SequenceNumber.of(from), SequenceNumber.of(to), keyGroupRange); } public InMemoryStateChangelogHandle( - List<StateChange> changes, SequenceNumber from, SequenceNumber to) { + List<StateChange> changes, + SequenceNumber from, + SequenceNumber to, + KeyGroupRange keyGroupRange) { this.changes = changes; this.from = from; this.to = to; + this.keyGroupRange = keyGroupRange; } @Override @@ -66,7 +72,7 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> @Override public KeyGroupRange getKeyGroupRange() { - throw new UnsupportedOperationException(); + return keyGroupRange; } @Nullable diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index cb5e953..fa99f69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -46,9 +47,14 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap<>(); + private final KeyGroupRange keyGroupRange; private long sqn = 0L; private boolean closed; + public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) { + this.keyGroupRange = keyGroupRange; + } + @Override public void append(int keyGroup, byte[] value) { Preconditions.checkState(!closed, "LogWriter is closed"); @@ -67,7 +73,9 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) { LOG.debug("Persist after {}", from); Preconditions.checkNotNull(from); - return completedFuture(new InMemoryStateChangelogHandle(collectChanges(from), from, SequenceNumber.of(sqn))); + return completedFuture( + new InMemoryStateChangelogHandle( + collectChanges(from), from, SequenceNumber.of(sqn), keyGroupRange)); } private List<StateChange> collectChanges(SequenceNumber after) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java index 61f5109..255de2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java @@ -27,6 +27,6 @@ public class InMemoryStateChangelogWriterFactory @Override public InMemoryStateChangelogWriter createWriter( String operatorID, KeyGroupRange keyGroupRange) { - return new InMemoryStateChangelogWriter(); + return new InMemoryStateChangelogWriter(keyGroupRange); } }
