curcur commented on code in PR #21405:
URL: https://github.com/apache/flink/pull/21405#discussion_r1041764339


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java:
##########
@@ -26,14 +26,21 @@
 @Internal
 public class StateChange implements Serializable {
 
+    /* For metadata, see FLINK-23035.*/
+    public static final int COMMON_KEY_GROUP = -1;

Review Comment:
   rename this to META_KEY_GROUP?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -167,6 +167,20 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         this.localChangelogRegistry = localChangelogRegistry;
     }
 
+    @Override
+    public void append(byte[] value) throws IOException {
+        LOG.trace("append metadata to {}: {} bytes", logId, value.length);
+        checkState(!closed, "%s is closed", logId);
+        activeChangeSet.add(new StateChange(value));
+        activeChangeSetSize += value.length;
+        if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
+            LOG.debug(
+                    "pre-emptively flush {}MB of appended changes to the 
common store",
+                    activeChangeSetSize / 1024 / 1024);
+            persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : 
notUploaded.firstKey());
+        }
+    }

Review Comment:
   There are a lot of duplicate code between the two "append" methods.
   
   Please extract the two common part and rename this to "appendMeta" for 
example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to