fredia commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1138645827
##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -455,12 +467,23 @@ private SnapshotResult<ChangelogStateBackendHandle>
buildSnapshotResult(
checkpointId,
changelogStateBackendStateCopy.materializationID,
persistedSizeOfThisCheckpoint);
- return SnapshotResult.withLocalState(
- jmHandle,
+ ChangelogStateBackendLocalHandle localHandle =
new ChangelogStateBackendLocalHandle(
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
localDeltaCopy,
- jmHandle));
+ jmHandle);
+ // register local handle to localRegistry
+ for (ChangelogStateHandle handle : localDeltaCopy) {
+ if (handle instanceof ChangelogStateHandleStreamImpl) {
+ ((ChangelogStateHandleStreamImpl) handle)
+ .getHandlesAndOffsets()
+ .forEach(
+ tuple ->
+ localChangelogRegistry.register(
Review Comment:
The registration in the `confirm()` was moved to
ChangelogKeyedStatebacked/persist() now, during one checkpoint, one local
handle is only registered once.
This change would affect `LocalChangelogRegistryImpl#prune()`, before this
change, the `prune()` wouldn't cause `fileNotFound`, because `notifyAbort()` is
impossible to come after `notifyComplete()` on TM(The lastUsedCheckpointId of
previous chk only update on `notifyComplete()`). I fixed this issue in the
third
commit(https://github.com/apache/flink/pull/21822/commits/de4b8003e2b3ef44d03fcad9b06920155b1e2185),
please help take a review again, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]