This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b357b4c555 [FLINK-23035][state/changelog] Add explicit append() to 
StateChangelogWriter to write metadata
6b357b4c555 is described below

commit 6b357b4c55569d4b4b55f101287475a73a2a209d
Author: fredia <[email protected]>
AuthorDate: Mon Nov 28 11:33:24 2022 +0800

    [FLINK-23035][state/changelog] Add explicit append() to 
StateChangelogWriter to write metadata
---
 .../flink/changelog/fs/FsStateChangelogWriter.java | 28 +++++++++++-----
 .../flink/changelog/fs/StateChangeFormat.java      | 37 +++++++++++++++-------
 .../fs/BatchingStateChangeUploadSchedulerTest.java |  2 +-
 .../metadata/MetadataV2V3SerializerBase.java       |  7 +++-
 .../flink/runtime/state/changelog/StateChange.java | 21 ++++++++++--
 .../state/changelog/StateChangelogWriter.java      |  3 ++
 .../inmemory/InMemoryStateChangelogWriter.java     | 22 ++++++++++++-
 .../state/changelog/AbstractStateChangeLogger.java |  6 +---
 .../state/changelog/StateChangeLoggerTestBase.java |  9 ++++--
 9 files changed, 102 insertions(+), 33 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index 2481fdcc2e4..f72c50a9d88 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -167,18 +167,20 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         this.localChangelogRegistry = localChangelogRegistry;
     }
 
+    @Override
+    public void appendMeta(byte[] value) throws IOException {
+        LOG.trace("append metadata to {}: {} bytes", logId, value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(StateChange.ofMetadataChange(value));
+        preEmptiveFlushIfNeeded(value);
+    }
+
     @Override
     public void append(int keyGroup, byte[] value) throws IOException {
         LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, 
value.length);
         checkState(!closed, "%s is closed", logId);
-        activeChangeSet.add(new StateChange(keyGroup, value));
-        activeChangeSetSize += value.length;
-        if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
-            LOG.debug(
-                    "pre-emptively flush {}MB of appended changes to the 
common store",
-                    activeChangeSetSize / 1024 / 1024);
-            persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
-        }
+        activeChangeSet.add(StateChange.ofDataChange(keyGroup, value));
+        preEmptiveFlushIfNeeded(value);
     }
 
     @Override
@@ -208,6 +210,16 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         return persistInternal(from);
     }
 
+    private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
+        activeChangeSetSize += value.length;
+        if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
+            LOG.debug(
+                    "pre-emptively flush {}MB of appended changes to the 
common store",
+                    activeChangeSetSize / 1024 / 1024);
+            persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
+        }
+    }
+
     private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
persistInternal(
             SequenceNumber from) throws IOException {
         ensureCanPersist(from);
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 58d142305cd..3c4d5cb7835 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -36,10 +36,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import static java.util.Comparator.comparing;
+import static 
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
 
 /** Serialization format for state changes. */
 @Internal
@@ -68,16 +68,27 @@ public class StateChangeFormat {
         // write in groups to output kg id only once
         Map<Integer, List<StateChange>> byKeyGroup =
                 
changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup));
-        // sort groups to output metadata first (see 
StateChangeLoggerImpl.COMMON_KEY_GROUP)
-        Map<Integer, List<StateChange>> sorted = new TreeMap<>(byKeyGroup);
-        output.writeInt(sorted.size());
-        for (Map.Entry<Integer, List<StateChange>> entry : sorted.entrySet()) {
-            output.writeInt(entry.getValue().size());
-            output.writeInt(entry.getKey());
-            for (StateChange stateChange : entry.getValue()) {
-                output.writeInt(stateChange.getChange().length);
-                output.write(stateChange.getChange());
-            }
+        // write the number of key groups
+        output.writeInt(byKeyGroup.size());
+        // output metadata first (see StateChange.META_KEY_GROUP)
+        List<StateChange> meta = byKeyGroup.remove(META_KEY_GROUP);
+        if (meta != null) {
+            writeChangeSetOfKG(output, META_KEY_GROUP, meta);
+        }
+        // output changeSets
+        for (Map.Entry<Integer, List<StateChange>> entry : 
byKeyGroup.entrySet()) {
+            writeChangeSetOfKG(output, entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void writeChangeSetOfKG(
+            DataOutputViewStreamWrapper output, int keyGroup, 
List<StateChange> stateChanges)
+            throws IOException {
+        output.writeInt(stateChanges.size());
+        output.writeInt(keyGroup);
+        for (StateChange stateChange : stateChanges) {
+            output.writeInt(stateChange.getChange().length);
+            output.write(stateChange.getChange());
         }
     }
 
@@ -124,7 +135,9 @@ public class StateChangeFormat {
                 int size = input.readInt();
                 byte[] bytes = new byte[size];
                 IOUtils.readFully(input, bytes, 0, size);
-                return new StateChange(keyGroup, bytes);
+                return keyGroup == META_KEY_GROUP
+                        ? StateChange.ofMetadataChange(bytes)
+                        : StateChange.ofDataChange(keyGroup, bytes);
             }
 
             @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index d70c3f63082..c2252b19606 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -364,7 +364,7 @@ class BatchingStateChangeUploadSchedulerTest {
                 new StateChangeSet(
                         UUID.randomUUID(),
                         SequenceNumber.of(0),
-                        singletonList(new StateChange(0, change))));
+                        singletonList(StateChange.ofDataChange(0, change))));
     }
 
     private static void withStore(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 37f22ee95ad..8f5be0f46a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -74,6 +74,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.UNKNOWN_CHECKPOINTED_SIZE;
+import static 
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
 
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
@@ -494,7 +495,11 @@ public abstract class MetadataV2V3SerializerBase {
                 int bytesSize = dis.readInt();
                 byte[] bytes = new byte[bytesSize];
                 IOUtils.readFully(dis, bytes, 0, bytesSize);
-                changes.add(new StateChange(keyGroup, bytes));
+                StateChange stateChange =
+                        keyGroup == META_KEY_GROUP
+                                ? StateChange.ofMetadataChange(bytes)
+                                : StateChange.ofDataChange(keyGroup, bytes);
+                changes.add(stateChange);
             }
             StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
             return InMemoryChangelogStateHandle.restore(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
index 88598444538..d9386f7844e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
@@ -26,18 +26,33 @@ import java.io.Serializable;
 @Internal
 public class StateChange implements Serializable {
 
+    /* For metadata, see FLINK-23035.*/
+    public static final int META_KEY_GROUP = -1;
+
     private static final long serialVersionUID = 1L;
 
     private final int keyGroup;
     private final byte[] change;
 
-    public StateChange(int keyGroup, byte[] change) {
-        // todo: enable check in FLINK-23035
-        // Preconditions.checkArgument(keyGroup >= 0);
+    StateChange(byte[] change) {
+        this.keyGroup = META_KEY_GROUP;
+        this.change = Preconditions.checkNotNull(change);
+    }
+
+    StateChange(int keyGroup, byte[] change) {
+        Preconditions.checkArgument(keyGroup >= 0);
         this.keyGroup = keyGroup;
         this.change = Preconditions.checkNotNull(change);
     }
 
+    public static StateChange ofMetadataChange(byte[] change) {
+        return new StateChange(change);
+    }
+
+    public static StateChange ofDataChange(int keyGroup, byte[] change) {
+        return new StateChange(keyGroup, change);
+    }
+
     @Override
     public String toString() {
         return String.format("keyGroup=%d, dataSize=%d", keyGroup, 
change.length);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 756528139d2..337047d6d05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -36,6 +36,9 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
      */
     SequenceNumber nextSequenceNumber();
 
+    /** Appends the provided **metadata** to this log. No persistency 
guarantees. */
+    void appendMeta(byte[] value) throws IOException;
+
     /** Appends the provided data to this log. No persistency guarantees. */
     void append(int keyGroup, byte[] value) throws IOException;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index eb07ba47716..7fdb3307640 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +42,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
 
 @NotThreadSafe
 class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChangelogStateHandle> {
@@ -57,6 +59,16 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
         this.keyGroupRange = keyGroupRange;
     }
 
+    @Override
+    public void appendMeta(byte[] value) throws IOException {
+        Preconditions.checkState(!closed, "LogWriter is closed");
+        LOG.trace("append metadata: {} bytes", value.length);
+        changesByKeyGroup
+                .computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>())
+                .put(sqn, value);
+        sqn = sqn.next();
+    }
+
     @Override
     public void append(int keyGroup, byte[] value) {
         Preconditions.checkState(!closed, "LogWriter is closed");
@@ -96,8 +108,16 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
 
     private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(
             NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber 
after, int keyGroup) {
+        if (keyGroup == META_KEY_GROUP) {
+            return changeMap.tailMap(after, true).entrySet().stream()
+                    .map(e2 -> Tuple2.of(e2.getKey(), 
StateChange.ofMetadataChange(e2.getValue())));
+        }
         return changeMap.tailMap(after, true).entrySet().stream()
-                .map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, 
e2.getValue())));
+                .map(
+                        e2 ->
+                                Tuple2.of(
+                                        e2.getKey(),
+                                        StateChange.ofDataChange(keyGroup, 
e2.getValue())));
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index c3f4abe5eee..1f84a912284 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -50,7 +50,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 abstract class AbstractStateChangeLogger<Key, Value, Ns>
         implements StateChangeLogger<Value, Ns>, Closeable {
-    static final int COMMON_KEY_GROUP = -1;
     protected final StateChangelogWriter<?> stateChangelogWriter;
     protected final InternalKeyContext<Key> keyContext;
     protected RegisteredStateMetaInfoBase metaInfo;
@@ -158,10 +157,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
 
     private void logMetaIfNeeded() throws IOException {
         if (!metaDataWritten) {
-            // todo: add StateChangelogWriter.append() version without a 
keygroup
-            //     when all callers and implementers are merged (FLINK-21356 
or later)
-            stateChangelogWriter.append(
-                    COMMON_KEY_GROUP,
+            stateChangelogWriter.appendMeta(
                     serializeRaw(
                             out -> {
                                 out.writeByte(METADATA.getCode());
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 9813409475f..e00b85c1ff8 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
+import static 
org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
 import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
 import static org.junit.Assert.assertEquals;
 
@@ -45,7 +45,7 @@ abstract class StateChangeLoggerTestBase<Namespace> {
 
         try (StateChangeLogger<String, Namespace> logger = getLogger(writer, 
keyContext)) {
             List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new 
ArrayList<>();
-            expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));
+            expectedAppends.add(Tuple2.of(META_KEY_GROUP, METADATA));
 
             // log every applicable operations, several times each
             int numOpTypes = StateChangeOperation.values().length;
@@ -104,6 +104,11 @@ abstract class StateChangeLoggerTestBase<Namespace> {
     protected static class TestingStateChangelogWriter implements 
StateChangelogWriter {
         private final List<Tuple2<Integer, StateChangeOperation>> appends = 
new ArrayList<>();
 
+        @Override
+        public void appendMeta(byte[] value) {
+            appends.add(Tuple2.of(META_KEY_GROUP, 
StateChangeOperation.byCode(value[0])));
+        }
+
         @Override
         public void append(int keyGroup, byte[] value) {
             appends.add(Tuple2.of(keyGroup, 
StateChangeOperation.byCode(value[0])));

Reply via email to