rkhachatryan commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r654955828
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -114,13 +128,33 @@
private final StateChangelogWriter<?> stateChangelogWriter;
+ private long lastCheckpointId = -1L;
+
/** last accessed partitioned state. */
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
/** For caching the last accessed partitioned state. */
private String lastName;
+ /** Updated initially on restore and later upon materialization (after
FLINK-21356). */
+ private final List<KeyedStateHandle> materialized = new ArrayList<>();
+
+ /** Updated initially on restore and later cleared upon materialization
(after FLINK-21356). */
+ private final List<StateChangelogHandle> nonMaterialized = new
ArrayList<>();
Review comment:
1. Checkpoints are incremental so that one is built on top of another.
So both materialized and non-materialized states from the previous checkpoint
must be included if they exist. Without restarts, non-materialized part from
the previous checkpoint can be collected from the writer. But after recovery,
there is a new writer with a new SQNs. So we explicitly include all state from
the previous checkpoint.
(Having a new writer for each attempt was a deliberate design decision to
avoid split-brain-like issues.)
2. I'm not sure I fully understand the question. The reported state is the
conceptually the same, but on TM it's not necessarily built from these two
variables. It can be `materialize` + `nonMaterialized` + `newNonMaterialized`
Does this make sense?
I should probably rename `nonMaterialized` to `restoredNonMaterialized`,
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]