This is an automated email from the ASF dual-hosted git repository.
yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b357b4c555 [FLINK-23035][state/changelog] Add explicit append() to
StateChangelogWriter to write metadata
6b357b4c555 is described below
commit 6b357b4c55569d4b4b55f101287475a73a2a209d
Author: fredia <[email protected]>
AuthorDate: Mon Nov 28 11:33:24 2022 +0800
[FLINK-23035][state/changelog] Add explicit append() to
StateChangelogWriter to write metadata
---
.../flink/changelog/fs/FsStateChangelogWriter.java | 28 +++++++++++-----
.../flink/changelog/fs/StateChangeFormat.java | 37 +++++++++++++++-------
.../fs/BatchingStateChangeUploadSchedulerTest.java | 2 +-
.../metadata/MetadataV2V3SerializerBase.java | 7 +++-
.../flink/runtime/state/changelog/StateChange.java | 21 ++++++++++--
.../state/changelog/StateChangelogWriter.java | 3 ++
.../inmemory/InMemoryStateChangelogWriter.java | 22 ++++++++++++-
.../state/changelog/AbstractStateChangeLogger.java | 6 +---
.../state/changelog/StateChangeLoggerTestBase.java | 9 ++++--
9 files changed, 102 insertions(+), 33 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index 2481fdcc2e4..f72c50a9d88 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -167,18 +167,20 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
this.localChangelogRegistry = localChangelogRegistry;
}
+ @Override
+ public void appendMeta(byte[] value) throws IOException {
+ LOG.trace("append metadata to {}: {} bytes", logId, value.length);
+ checkState(!closed, "%s is closed", logId);
+ activeChangeSet.add(StateChange.ofMetadataChange(value));
+ preEmptiveFlushIfNeeded(value);
+ }
+
@Override
public void append(int keyGroup, byte[] value) throws IOException {
LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup,
value.length);
checkState(!closed, "%s is closed", logId);
- activeChangeSet.add(new StateChange(keyGroup, value));
- activeChangeSetSize += value.length;
- if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
- LOG.debug(
- "pre-emptively flush {}MB of appended changes to the
common store",
- activeChangeSetSize / 1024 / 1024);
- persistInternal(notUploaded.isEmpty() ? activeSequenceNumber :
notUploaded.firstKey());
- }
+ activeChangeSet.add(StateChange.ofDataChange(keyGroup, value));
+ preEmptiveFlushIfNeeded(value);
}
@Override
@@ -208,6 +210,16 @@ class FsStateChangelogWriter implements
StateChangelogWriter<ChangelogStateHandl
return persistInternal(from);
}
+ private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
+ activeChangeSetSize += value.length;
+ if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
+ LOG.debug(
+ "pre-emptively flush {}MB of appended changes to the
common store",
+ activeChangeSetSize / 1024 / 1024);
+ persistInternal(notUploaded.isEmpty() ? activeSequenceNumber :
notUploaded.firstKey());
+ }
+ }
+
private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
persistInternal(
SequenceNumber from) throws IOException {
ensureCanPersist(from);
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 58d142305cd..3c4d5cb7835 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -36,10 +36,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.TreeMap;
import java.util.stream.Collectors;
import static java.util.Comparator.comparing;
+import static
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
/** Serialization format for state changes. */
@Internal
@@ -68,16 +68,27 @@ public class StateChangeFormat {
// write in groups to output kg id only once
Map<Integer, List<StateChange>> byKeyGroup =
changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup));
- // sort groups to output metadata first (see
StateChangeLoggerImpl.COMMON_KEY_GROUP)
- Map<Integer, List<StateChange>> sorted = new TreeMap<>(byKeyGroup);
- output.writeInt(sorted.size());
- for (Map.Entry<Integer, List<StateChange>> entry : sorted.entrySet()) {
- output.writeInt(entry.getValue().size());
- output.writeInt(entry.getKey());
- for (StateChange stateChange : entry.getValue()) {
- output.writeInt(stateChange.getChange().length);
- output.write(stateChange.getChange());
- }
+ // write the number of key groups
+ output.writeInt(byKeyGroup.size());
+ // output metadata first (see StateChange.META_KEY_GROUP)
+ List<StateChange> meta = byKeyGroup.remove(META_KEY_GROUP);
+ if (meta != null) {
+ writeChangeSetOfKG(output, META_KEY_GROUP, meta);
+ }
+ // output changeSets
+ for (Map.Entry<Integer, List<StateChange>> entry :
byKeyGroup.entrySet()) {
+ writeChangeSetOfKG(output, entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void writeChangeSetOfKG(
+ DataOutputViewStreamWrapper output, int keyGroup,
List<StateChange> stateChanges)
+ throws IOException {
+ output.writeInt(stateChanges.size());
+ output.writeInt(keyGroup);
+ for (StateChange stateChange : stateChanges) {
+ output.writeInt(stateChange.getChange().length);
+ output.write(stateChange.getChange());
}
}
@@ -124,7 +135,9 @@ public class StateChangeFormat {
int size = input.readInt();
byte[] bytes = new byte[size];
IOUtils.readFully(input, bytes, 0, size);
- return new StateChange(keyGroup, bytes);
+ return keyGroup == META_KEY_GROUP
+ ? StateChange.ofMetadataChange(bytes)
+ : StateChange.ofDataChange(keyGroup, bytes);
}
@Override
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index d70c3f63082..c2252b19606 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -364,7 +364,7 @@ class BatchingStateChangeUploadSchedulerTest {
new StateChangeSet(
UUID.randomUUID(),
SequenceNumber.of(0),
- singletonList(new StateChange(0, change))));
+ singletonList(StateChange.ofDataChange(0, change))));
}
private static void withStore(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 37f22ee95ad..8f5be0f46a7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -74,6 +74,7 @@ import java.util.Map;
import java.util.UUID;
import static
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.UNKNOWN_CHECKPOINTED_SIZE;
+import static
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
/**
* Base (De)serializer for checkpoint metadata format version 2 and 3.
@@ -494,7 +495,11 @@ public abstract class MetadataV2V3SerializerBase {
int bytesSize = dis.readInt();
byte[] bytes = new byte[bytesSize];
IOUtils.readFully(dis, bytes, 0, bytesSize);
- changes.add(new StateChange(keyGroup, bytes));
+ StateChange stateChange =
+ keyGroup == META_KEY_GROUP
+ ? StateChange.ofMetadataChange(bytes)
+ : StateChange.ofDataChange(keyGroup, bytes);
+ changes.add(stateChange);
}
StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
return InMemoryChangelogStateHandle.restore(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
index 88598444538..d9386f7844e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
@@ -26,18 +26,33 @@ import java.io.Serializable;
@Internal
public class StateChange implements Serializable {
+ /* For metadata, see FLINK-23035.*/
+ public static final int META_KEY_GROUP = -1;
+
private static final long serialVersionUID = 1L;
private final int keyGroup;
private final byte[] change;
- public StateChange(int keyGroup, byte[] change) {
- // todo: enable check in FLINK-23035
- // Preconditions.checkArgument(keyGroup >= 0);
+ StateChange(byte[] change) {
+ this.keyGroup = META_KEY_GROUP;
+ this.change = Preconditions.checkNotNull(change);
+ }
+
+ StateChange(int keyGroup, byte[] change) {
+ Preconditions.checkArgument(keyGroup >= 0);
this.keyGroup = keyGroup;
this.change = Preconditions.checkNotNull(change);
}
+ public static StateChange ofMetadataChange(byte[] change) {
+ return new StateChange(change);
+ }
+
+ public static StateChange ofDataChange(int keyGroup, byte[] change) {
+ return new StateChange(keyGroup, change);
+ }
+
@Override
public String toString() {
return String.format("keyGroup=%d, dataSize=%d", keyGroup,
change.length);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 756528139d2..337047d6d05 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -36,6 +36,9 @@ public interface StateChangelogWriter<Handle extends
ChangelogStateHandle> exten
*/
SequenceNumber nextSequenceNumber();
+ /** Appends the provided **metadata** to this log. No persistency
guarantees. */
+ void appendMeta(byte[] value) throws IOException;
+
/** Appends the provided data to this log. No persistency guarantees. */
void append(int keyGroup, byte[] value) throws IOException;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index eb07ba47716..7fdb3307640 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -41,6 +42,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
@NotThreadSafe
class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChangelogStateHandle> {
@@ -57,6 +59,16 @@ class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChang
this.keyGroupRange = keyGroupRange;
}
+ @Override
+ public void appendMeta(byte[] value) throws IOException {
+ Preconditions.checkState(!closed, "LogWriter is closed");
+ LOG.trace("append metadata: {} bytes", value.length);
+ changesByKeyGroup
+ .computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>())
+ .put(sqn, value);
+ sqn = sqn.next();
+ }
+
@Override
public void append(int keyGroup, byte[] value) {
Preconditions.checkState(!closed, "LogWriter is closed");
@@ -96,8 +108,16 @@ class InMemoryStateChangelogWriter implements
StateChangelogWriter<InMemoryChang
private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(
NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber
after, int keyGroup) {
+ if (keyGroup == META_KEY_GROUP) {
+ return changeMap.tailMap(after, true).entrySet().stream()
+ .map(e2 -> Tuple2.of(e2.getKey(),
StateChange.ofMetadataChange(e2.getValue())));
+ }
return changeMap.tailMap(after, true).entrySet().stream()
- .map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup,
e2.getValue())));
+ .map(
+ e2 ->
+ Tuple2.of(
+ e2.getKey(),
+ StateChange.ofDataChange(keyGroup,
e2.getValue())));
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index c3f4abe5eee..1f84a912284 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -50,7 +50,6 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeLogger<Value, Ns>, Closeable {
- static final int COMMON_KEY_GROUP = -1;
protected final StateChangelogWriter<?> stateChangelogWriter;
protected final InternalKeyContext<Key> keyContext;
protected RegisteredStateMetaInfoBase metaInfo;
@@ -158,10 +157,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
private void logMetaIfNeeded() throws IOException {
if (!metaDataWritten) {
- // todo: add StateChangelogWriter.append() version without a
keygroup
- // when all callers and implementers are merged (FLINK-21356
or later)
- stateChangelogWriter.append(
- COMMON_KEY_GROUP,
+ stateChangelogWriter.appendMeta(
serializeRaw(
out -> {
out.writeByte(METADATA.getCode());
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 9813409475f..e00b85c1ff8 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -31,7 +31,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
+import static
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
import static org.junit.Assert.assertEquals;
@@ -45,7 +45,7 @@ abstract class StateChangeLoggerTestBase<Namespace> {
try (StateChangeLogger<String, Namespace> logger = getLogger(writer,
keyContext)) {
List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new
ArrayList<>();
- expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));
+ expectedAppends.add(Tuple2.of(META_KEY_GROUP, METADATA));
// log every applicable operations, several times each
int numOpTypes = StateChangeOperation.values().length;
@@ -104,6 +104,11 @@ abstract class StateChangeLoggerTestBase<Namespace> {
protected static class TestingStateChangelogWriter implements
StateChangelogWriter {
private final List<Tuple2<Integer, StateChangeOperation>> appends =
new ArrayList<>();
+ @Override
+ public void appendMeta(byte[] value) {
+ appends.add(Tuple2.of(META_KEY_GROUP,
StateChangeOperation.byCode(value[0])));
+ }
+
@Override
public void append(int keyGroup, byte[] value) {
appends.add(Tuple2.of(keyGroup,
StateChangeOperation.byCode(value[0])));