rkhachatryan commented on a change in pull request #16035:
URL: https://github.com/apache/flink/pull/16035#discussion_r642612519
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -87,11 +98,27 @@ protected void log(
ThrowingConsumer<DataOutputViewStreamWrapper, IOException>
dataWriter,
Ns ns)
throws IOException {
- // todo: log metadata (FLINK-22808)
+ logMetaIfNeeded();
stateChangelogWriter.append(
keyContext.getCurrentKeyGroupIndex(), serialize(op, ns,
dataWriter));
}
+ private void logMetaIfNeeded() throws IOException {
+ if (metaDataWritten) {
+ return;
+ }
+ stateChangelogWriter.append(
+ COMMON_KEY_GROUP,
Review comment:
A negative keygroup number here is used to make writer write metadata in
the very beginning of its current batch. That way, on recovery metadata is be
read before the data changes, even if they were made to a smaller keygroup or
come in because of rescaling.
I'd add a version of `append` without `keygroup` to `StateChangelogWriter`,
but probably there are some better alternatives?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -100,6 +127,8 @@ protected void log(
return serializeRaw(
wrapper -> {
wrapper.writeByte(op.code);
+ // todo: wrapper.writeShort(stateId); and/or sort and
write once (same for key, ns?)
+ wrapper.writeUTF(metaInfo.getName());
Review comment:
Writing state name for each change is sub-optimal, and there are two
ways to avoid it:
1. write a mapping from state name to some id in the beginning, and write
only id here
2. sort changes by state name and write it once
The first option is easier to implement but turned out more error-prone (on
recovery).
The second option is also more efficient and can be extended to other
dimensions as well (key, namespace).
However, I think it's easier to have this simpler version with recovery
integrated (and maybe benchmarked) and then apply the optimization.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]