This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 585591284cc2f6c539269ce4461b0bb57579da74 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Jun 2 17:44:21 2021 +0200 [FLINK-21356][state/changelog] Serialize handles implementations Add ability to serialize metadata for in-memory and file implementations of changelog handles --- .../metadata/MetadataV2V3SerializerBase.java | 64 ++++++++++++++++++++++ .../changelog/StateChangelogHandleStreamImpl.java | 34 ++++++++++-- .../inmemory/InMemoryStateChangelogHandle.java | 51 +++++++++++------ .../inmemory/InMemoryStateChangelogWriter.java | 27 +++++---- 4 files changed, 144 insertions(+), 32 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index bb19fa1..886ab37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.metadata; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; @@ -37,6 +38,9 @@ import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; +import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle; @@ -93,6 +97,8 @@ public abstract class MetadataV2V3SerializerBase { private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5; private static final byte RELATIVE_STREAM_STATE_HANDLE = 6; private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7; + private static final byte CHANGELOG_BYTE_INCREMENT_HANDLE = 9; + private static final byte CHANGELOG_FILE_INCREMENT_HANDLE = 10; // ------------------------------------------------------------------------ // (De)serialization entry points @@ -308,6 +314,34 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + + } else if (stateHandle instanceof InMemoryStateChangelogHandle) { + InMemoryStateChangelogHandle handle = (InMemoryStateChangelogHandle) stateHandle; + dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE); + dos.writeLong(handle.getFrom()); + dos.writeLong(handle.getTo()); + List<StateChange> list = new ArrayList<>(); + handle.getChanges(null).forEachRemaining(list::add); + dos.writeInt(list.size()); + for (StateChange change : list) { + dos.writeInt(change.getKeyGroup()); + dos.writeInt(change.getChange().length); + dos.write(change.getChange()); + } + + } else if (stateHandle instanceof StateChangelogHandleStreamImpl) { + StateChangelogHandleStreamImpl handle = (StateChangelogHandleStreamImpl) stateHandle; + dos.writeByte(CHANGELOG_FILE_INCREMENT_HANDLE); + dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); + dos.writeInt(handle.getHandlesAndOffsets().size()); + for (Tuple2<StreamStateHandle, Long> streamHandleAndOffset : + handle.getHandlesAndOffsets()) { + dos.writeLong(streamHandleAndOffset.f1); + serializeStreamStateHandle(streamHandleAndOffset.f0, dos); + } + dos.writeLong(handle.getStateSize()); + } else { throw new IllegalStateException( "Unknown KeyedStateHandle type: " + stateHandle.getClass()); @@ -370,6 +404,36 @@ public abstract class MetadataV2V3SerializerBase { sharedStates, privateStates, metaDataStateHandle); + + } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) { + long from = dis.readLong(); + long to = dis.readLong(); + int size = dis.readInt(); + List<StateChange> changes = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + int keyGroup = dis.readInt(); + int bytesSize = dis.readInt(); + byte[] bytes = new byte[bytesSize]; + dis.read(bytes); + changes.add(new StateChange(keyGroup, bytes)); + } + return new InMemoryStateChangelogHandle(changes, from, to); + + } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) { + int start = dis.readInt(); + int numKeyGroups = dis.readInt(); + KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + numKeyGroups - 1); + int numHandles = dis.readInt(); + List<Tuple2<StreamStateHandle, Long>> streamHandleAndOffset = + new ArrayList<>(numHandles); + for (int i = 0; i < numHandles; i++) { + long o = dis.readLong(); + StreamStateHandle h = deserializeStreamStateHandle(dis, context); + streamHandleAndOffset.add(Tuple2.of(h, o)); + } + long size = dis.readLong(); + return new StateChangelogHandleStreamImpl(streamHandleAndOffset, keyGroupRange, size); + } else { throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java index a8ab78f..16a4e60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -29,28 +30,45 @@ import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; /** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */ @Internal public final class StateChangelogHandleStreamImpl implements StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> { private static final long serialVersionUID = -8070326169926626355L; + private static final Logger LOG = LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class); private final KeyGroupRange keyGroupRange; /** NOTE: order is important as it reflects the order of changes. */ private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets; private transient SharedStateRegistry stateRegistry; + private final long size; public StateChangelogHandleStreamImpl( - List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, KeyGroupRange keyGroupRange) { + List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, + KeyGroupRange keyGroupRange, + long size) { this.handlesAndOffsets = handlesAndOffsets; this.keyGroupRange = keyGroupRange; + this.size = size; + } + + public StateChangelogHandleStreamImpl( + List<Tuple3<StreamStateHandle, Long, Long>> sorted, KeyGroupRange keyGroupRange) { + this( + sorted.stream().map(t -> Tuple2.of(t.f0, t.f1)).collect(Collectors.toList()), + keyGroupRange, + sorted.stream().mapToLong(t1 -> t1.f2).sum()); } @Override @@ -74,7 +92,7 @@ public final class StateChangelogHandleStreamImpl if (offsets.getNumberOfKeyGroups() == 0) { return null; } - return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets); + return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets, 0L /* unknown */); } @Override @@ -99,10 +117,12 @@ public final class StateChangelogHandleStreamImpl private void advance() { while (!current.hasNext() && handleIterator.hasNext()) { - Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); try { + current.close(); + Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); + LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); current = reader.read(tuple2.f0, tuple2.f1); - } catch (IOException e) { + } catch (Exception e) { ExceptionUtils.rethrow(e); } } @@ -123,7 +143,7 @@ public final class StateChangelogHandleStreamImpl @Override public long getStateSize() { - return 0; + return size; } private static SharedStateRegistryKey getKey(StreamStateHandle stateHandle) { @@ -146,4 +166,8 @@ public final class StateChangelogHandleStreamImpl CloseableIterator<StateChange> read(StreamStateHandle handle, long offset) throws IOException; } + + public List<Tuple2<StreamStateHandle, Long>> getHandlesAndOffsets() { + return handlesAndOffsets; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java index e43a2ca..e71cac2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java @@ -17,9 +17,11 @@ package org.apache.flink.runtime.state.changelog.inmemory; +import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.util.CloseableIterator; @@ -27,19 +29,26 @@ import org.apache.flink.util.CloseableIterator; import javax.annotation.Nullable; import java.util.List; -import java.util.Map; -import java.util.stream.Stream; -import static java.util.stream.Collectors.toList; - -class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> { +/** In-memory {@link StateChangelogHandle}. */ +@Internal +public class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> { private static final long serialVersionUID = 1L; - private final Map<Integer, List<byte[]>> changes; + private final List<StateChange> changes; + private final SequenceNumber from; // for debug purposes + private final SequenceNumber to; // for debug purposes + + public InMemoryStateChangelogHandle(List<StateChange> changes, long from, long to) { + this(changes, SequenceNumber.of(from), SequenceNumber.of(to)); + } - public InMemoryStateChangelogHandle(Map<Integer, List<byte[]>> changes) { + public InMemoryStateChangelogHandle( + List<StateChange> changes, SequenceNumber from, SequenceNumber to) { this.changes = changes; + this.from = from; + this.to = to; } @Override @@ -47,19 +56,12 @@ class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> { @Override public long getStateSize() { - return 0; + return changes.stream().mapToLong(change -> change.getChange().length).sum(); } @Override public CloseableIterator<StateChange> getChanges(Void unused) { - return CloseableIterator.fromList( - changes.entrySet().stream().flatMap(this::mapEntryToChangeStream).collect(toList()), - change -> {}); - } - - private Stream<StateChange> mapEntryToChangeStream(Map.Entry<Integer, List<byte[]>> entry) { - int keyGroup = entry.getKey(); - return entry.getValue().stream().map(bytes -> new StateChange(keyGroup, bytes)); + return CloseableIterator.fromList(changes, change -> {}); } @Override @@ -70,11 +72,26 @@ class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> { @Nullable @Override public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - throw new UnsupportedOperationException(); + return changes.stream().mapToInt(StateChange::getKeyGroup).anyMatch(keyGroupRange::contains) + ? this + : null; } @Override public void registerSharedStates(SharedStateRegistry stateRegistry) { throw new UnsupportedOperationException(); } + + @Override + public String toString() { + return String.format("from %s to %s: %s", from, to, changes); + } + + public long getFrom() { + return ((SequenceNumber.GenericSequenceNumber) from).number; + } + + public long getTo() { + return ((SequenceNumber.GenericSequenceNumber) to).number; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index 5ab68e7..cb5e953 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -17,7 +17,9 @@ package org.apache.flink.runtime.state.changelog.inmemory; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.changelog.SequenceNumber; +import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.util.Preconditions; @@ -26,16 +28,17 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; -import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.stream.Collectors.toMap; @NotThreadSafe class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> { @@ -64,17 +67,21 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) { LOG.debug("Persist after {}", from); Preconditions.checkNotNull(from); - return completedFuture(new InMemoryStateChangelogHandle(collectChanges(from))); + return completedFuture(new InMemoryStateChangelogHandle(collectChanges(from), from, SequenceNumber.of(sqn))); } - private Map<Integer, List<byte[]>> collectChanges(SequenceNumber after) { + private List<StateChange> collectChanges(SequenceNumber after) { return changesByKeyGroup.entrySet().stream() - .collect( - toMap( - Map.Entry::getKey, - kv -> - new ArrayList<>( - kv.getValue().tailMap(after, true).values()))); + .flatMap(e -> toChangeStream(e.getValue(), after, e.getKey())) + .sorted(Comparator.comparing(sqnAndChange -> sqnAndChange.f0)) + .map(t -> t.f1) + .collect(Collectors.toList()); + } + + private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream( + NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber after, int keyGroup) { + return changeMap.tailMap(after, true).entrySet().stream() + .map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, e2.getValue()))); } @Override
