curcur commented on code in PR #21405: URL: https://github.com/apache/flink/pull/21405#discussion_r1041764339
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java: ########## @@ -26,14 +26,21 @@ @Internal public class StateChange implements Serializable { + /* For metadata, see FLINK-23035.*/ + public static final int COMMON_KEY_GROUP = -1; Review Comment: rename this to META_KEY_GROUP? ########## flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java: ########## @@ -167,6 +167,20 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl this.localChangelogRegistry = localChangelogRegistry; } + @Override + public void append(byte[] value) throws IOException { + LOG.trace("append metadata to {}: {} bytes", logId, value.length); + checkState(!closed, "%s is closed", logId); + activeChangeSet.add(new StateChange(value)); + activeChangeSetSize += value.length; + if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) { + LOG.debug( + "pre-emptively flush {}MB of appended changes to the common store", + activeChangeSetSize / 1024 / 1024); + persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : notUploaded.firstKey()); + } + } Review Comment: There are a lot of duplicate code between the two "append" methods. Please extract the two common part and rename this to "appendMeta" for example. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org