fredia commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1138645827


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -455,12 +467,23 @@ private SnapshotResult<ChangelogStateBackendHandle> 
buildSnapshotResult(
                             checkpointId,
                             changelogStateBackendStateCopy.materializationID,
                             persistedSizeOfThisCheckpoint);
-            return SnapshotResult.withLocalState(
-                    jmHandle,
+            ChangelogStateBackendLocalHandle localHandle =
                     new ChangelogStateBackendLocalHandle(
                             
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
                             localDeltaCopy,
-                            jmHandle));
+                            jmHandle);
+            // register local handle to localRegistry
+            for (ChangelogStateHandle handle : localDeltaCopy) {
+                if (handle instanceof ChangelogStateHandleStreamImpl) {
+                    ((ChangelogStateHandleStreamImpl) handle)
+                            .getHandlesAndOffsets()
+                            .forEach(
+                                    tuple ->
+                                            localChangelogRegistry.register(

Review Comment:
   The registration in the `confirm()` was moved to 
ChangelogKeyedStatebacked/persist() now, during one checkpoint, one local 
handle is only registered once.
   
   This change would affect `LocalChangelogRegistryImpl#prune()`, before this 
change, the `prune()` wouldn't cause `fileNotFound`, because `notifyAbort()` is 
impossible to come after `notifyComplete()` on TM(The lastUsedCheckpointId of 
previous chk only update on `notifyComplete()`).  I fixed this issue in the 
third 
commit(https://github.com/apache/flink/pull/21822/commits/de4b8003e2b3ef44d03fcad9b06920155b1e2185),
 please help take a review again, thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to