This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 585591284cc2f6c539269ce4461b0bb57579da74
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 2 17:44:21 2021 +0200

    [FLINK-21356][state/changelog] Serialize handles implementations
    
    Add ability to serialize metadata for in-memory and file implementations of 
changelog handles
---
 .../metadata/MetadataV2V3SerializerBase.java       | 64 ++++++++++++++++++++++
 .../changelog/StateChangelogHandleStreamImpl.java  | 34 ++++++++++--
 .../inmemory/InMemoryStateChangelogHandle.java     | 51 +++++++++++------
 .../inmemory/InMemoryStateChangelogWriter.java     | 27 +++++----
 4 files changed, 144 insertions(+), 32 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index bb19fa1..886ab37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.metadata;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -37,6 +38,9 @@ import 
org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
+import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
@@ -93,6 +97,8 @@ public abstract class MetadataV2V3SerializerBase {
     private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
     private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
     private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7;
+    private static final byte CHANGELOG_BYTE_INCREMENT_HANDLE = 9;
+    private static final byte CHANGELOG_FILE_INCREMENT_HANDLE = 10;
 
     // ------------------------------------------------------------------------
     //  (De)serialization entry points
@@ -308,6 +314,34 @@ public abstract class MetadataV2V3SerializerBase {
 
             
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), 
dos);
             
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), 
dos);
+
+        } else if (stateHandle instanceof InMemoryStateChangelogHandle) {
+            InMemoryStateChangelogHandle handle = 
(InMemoryStateChangelogHandle) stateHandle;
+            dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE);
+            dos.writeLong(handle.getFrom());
+            dos.writeLong(handle.getTo());
+            List<StateChange> list = new ArrayList<>();
+            handle.getChanges(null).forEachRemaining(list::add);
+            dos.writeInt(list.size());
+            for (StateChange change : list) {
+                dos.writeInt(change.getKeyGroup());
+                dos.writeInt(change.getChange().length);
+                dos.write(change.getChange());
+            }
+
+        } else if (stateHandle instanceof StateChangelogHandleStreamImpl) {
+            StateChangelogHandleStreamImpl handle = 
(StateChangelogHandleStreamImpl) stateHandle;
+            dos.writeByte(CHANGELOG_FILE_INCREMENT_HANDLE);
+            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
+            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
+            dos.writeInt(handle.getHandlesAndOffsets().size());
+            for (Tuple2<StreamStateHandle, Long> streamHandleAndOffset :
+                    handle.getHandlesAndOffsets()) {
+                dos.writeLong(streamHandleAndOffset.f1);
+                serializeStreamStateHandle(streamHandleAndOffset.f0, dos);
+            }
+            dos.writeLong(handle.getStateSize());
+
         } else {
             throw new IllegalStateException(
                     "Unknown KeyedStateHandle type: " + 
stateHandle.getClass());
@@ -370,6 +404,36 @@ public abstract class MetadataV2V3SerializerBase {
                     sharedStates,
                     privateStates,
                     metaDataStateHandle);
+
+        } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) {
+            long from = dis.readLong();
+            long to = dis.readLong();
+            int size = dis.readInt();
+            List<StateChange> changes = new ArrayList<>(size);
+            for (int i = 0; i < size; i++) {
+                int keyGroup = dis.readInt();
+                int bytesSize = dis.readInt();
+                byte[] bytes = new byte[bytesSize];
+                dis.read(bytes);
+                changes.add(new StateChange(keyGroup, bytes));
+            }
+            return new InMemoryStateChangelogHandle(changes, from, to);
+
+        } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) {
+            int start = dis.readInt();
+            int numKeyGroups = dis.readInt();
+            KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + 
numKeyGroups - 1);
+            int numHandles = dis.readInt();
+            List<Tuple2<StreamStateHandle, Long>> streamHandleAndOffset =
+                    new ArrayList<>(numHandles);
+            for (int i = 0; i < numHandles; i++) {
+                long o = dis.readLong();
+                StreamStateHandle h = deserializeStreamStateHandle(dis, 
context);
+                streamHandleAndOffset.add(Tuple2.of(h, o));
+            }
+            long size = dis.readLong();
+            return new StateChangelogHandleStreamImpl(streamHandleAndOffset, 
keyGroupRange, size);
+
         } else {
             throw new IllegalStateException("Reading invalid KeyedStateHandle, 
type: " + type);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
index a8ab78f..16a4e60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -29,28 +30,45 @@ import 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
 @Internal
 public final class StateChangelogHandleStreamImpl
         implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
     private static final long serialVersionUID = -8070326169926626355L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class);
 
     private final KeyGroupRange keyGroupRange;
     /** NOTE: order is important as it reflects the order of changes. */
     private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
 
     private transient SharedStateRegistry stateRegistry;
+    private final long size;
 
     public StateChangelogHandleStreamImpl(
-            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, 
KeyGroupRange keyGroupRange) {
+            List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets,
+            KeyGroupRange keyGroupRange,
+            long size) {
         this.handlesAndOffsets = handlesAndOffsets;
         this.keyGroupRange = keyGroupRange;
+        this.size = size;
+    }
+
+    public StateChangelogHandleStreamImpl(
+            List<Tuple3<StreamStateHandle, Long, Long>> sorted, KeyGroupRange 
keyGroupRange) {
+        this(
+                sorted.stream().map(t -> Tuple2.of(t.f0, 
t.f1)).collect(Collectors.toList()),
+                keyGroupRange,
+                sorted.stream().mapToLong(t1 -> t1.f2).sum());
     }
 
     @Override
@@ -74,7 +92,7 @@ public final class StateChangelogHandleStreamImpl
         if (offsets.getNumberOfKeyGroups() == 0) {
             return null;
         }
-        return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets);
+        return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets, 
0L /* unknown */);
     }
 
     @Override
@@ -99,10 +117,12 @@ public final class StateChangelogHandleStreamImpl
 
             private void advance() {
                 while (!current.hasNext() && handleIterator.hasNext()) {
-                    Tuple2<StreamStateHandle, Long> tuple2 = 
handleIterator.next();
                     try {
+                        current.close();
+                        Tuple2<StreamStateHandle, Long> tuple2 = 
handleIterator.next();
+                        LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
                         current = reader.read(tuple2.f0, tuple2.f1);
-                    } catch (IOException e) {
+                    } catch (Exception e) {
                         ExceptionUtils.rethrow(e);
                     }
                 }
@@ -123,7 +143,7 @@ public final class StateChangelogHandleStreamImpl
 
     @Override
     public long getStateSize() {
-        return 0;
+        return size;
     }
 
     private static SharedStateRegistryKey getKey(StreamStateHandle 
stateHandle) {
@@ -146,4 +166,8 @@ public final class StateChangelogHandleStreamImpl
         CloseableIterator<StateChange> read(StreamStateHandle handle, long 
offset)
                 throws IOException;
     }
+
+    public List<Tuple2<StreamStateHandle, Long>> getHandlesAndOffsets() {
+        return handlesAndOffsets;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
index e43a2ca..e71cac2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -17,9 +17,11 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.util.CloseableIterator;
@@ -27,19 +29,26 @@ import org.apache.flink.util.CloseableIterator;
 import javax.annotation.Nullable;
 
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
 
-import static java.util.stream.Collectors.toList;
-
-class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> {
+/** In-memory {@link StateChangelogHandle}. */
+@Internal
+public class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void> {
 
     private static final long serialVersionUID = 1L;
 
-    private final Map<Integer, List<byte[]>> changes;
+    private final List<StateChange> changes;
+    private final SequenceNumber from; // for debug purposes
+    private final SequenceNumber to; // for debug purposes
+
+    public InMemoryStateChangelogHandle(List<StateChange> changes, long from, 
long to) {
+        this(changes, SequenceNumber.of(from), SequenceNumber.of(to));
+    }
 
-    public InMemoryStateChangelogHandle(Map<Integer, List<byte[]>> changes) {
+    public InMemoryStateChangelogHandle(
+            List<StateChange> changes, SequenceNumber from, SequenceNumber to) 
{
         this.changes = changes;
+        this.from = from;
+        this.to = to;
     }
 
     @Override
@@ -47,19 +56,12 @@ class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void> {
 
     @Override
     public long getStateSize() {
-        return 0;
+        return changes.stream().mapToLong(change -> 
change.getChange().length).sum();
     }
 
     @Override
     public CloseableIterator<StateChange> getChanges(Void unused) {
-        return CloseableIterator.fromList(
-                
changes.entrySet().stream().flatMap(this::mapEntryToChangeStream).collect(toList()),
-                change -> {});
-    }
-
-    private Stream<StateChange> mapEntryToChangeStream(Map.Entry<Integer, 
List<byte[]>> entry) {
-        int keyGroup = entry.getKey();
-        return entry.getValue().stream().map(bytes -> new 
StateChange(keyGroup, bytes));
+        return CloseableIterator.fromList(changes, change -> {});
     }
 
     @Override
@@ -70,11 +72,26 @@ class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void> {
     @Nullable
     @Override
     public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-        throw new UnsupportedOperationException();
+        return 
changes.stream().mapToInt(StateChange::getKeyGroup).anyMatch(keyGroupRange::contains)
+                ? this
+                : null;
     }
 
     @Override
     public void registerSharedStates(SharedStateRegistry stateRegistry) {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public String toString() {
+        return String.format("from %s to %s: %s", from, to, changes);
+    }
+
+    public long getFrom() {
+        return ((SequenceNumber.GenericSequenceNumber) from).number;
+    }
+
+    public long getTo() {
+        return ((SequenceNumber.GenericSequenceNumber) to).number;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 5ab68e7..cb5e953 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.Preconditions;
 
@@ -26,16 +28,17 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
-import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toMap;
 
 @NotThreadSafe
 class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryStateChangelogHandle> {
@@ -64,17 +67,21 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
     public CompletableFuture<InMemoryStateChangelogHandle> 
persist(SequenceNumber from) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
-        return completedFuture(new 
InMemoryStateChangelogHandle(collectChanges(from)));
+        return completedFuture(new 
InMemoryStateChangelogHandle(collectChanges(from), from, 
SequenceNumber.of(sqn)));
     }
 
-    private Map<Integer, List<byte[]>> collectChanges(SequenceNumber after) {
+    private List<StateChange> collectChanges(SequenceNumber after) {
         return changesByKeyGroup.entrySet().stream()
-                .collect(
-                        toMap(
-                                Map.Entry::getKey,
-                                kv ->
-                                        new ArrayList<>(
-                                                kv.getValue().tailMap(after, 
true).values())));
+                .flatMap(e -> toChangeStream(e.getValue(), after, e.getKey()))
+                .sorted(Comparator.comparing(sqnAndChange -> sqnAndChange.f0))
+                .map(t -> t.f1)
+                .collect(Collectors.toList());
+    }
+
+    private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(
+            NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber 
after, int keyGroup) {
+        return changeMap.tailMap(after, true).entrySet().stream()
+                .map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, 
e2.getValue())));
     }
 
     @Override

Reply via email to