klion26 commented on code in PR #21050:
URL: https://github.com/apache/flink/pull/21050#discussion_r997655058


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -95,60 +95,78 @@ public StreamStateHandle registerReference(
             entry = registeredStates.get(registrationKey);
 
             if (entry == null) {
-                // Additional check that should never fail, because only state 
handles that are not
-                // placeholders should
-                // ever be inserted to the registry.
                 checkState(
-                        !isPlaceholder(state),
+                        !isPlaceholder(newHandle),
                         "Attempt to reference unknown state: " + 
registrationKey);
 
-                entry = new SharedStateEntry(state, checkpointID);
+                LOG.trace(
+                        "Registered new shared state {} under key {}.", 
newHandle, registrationKey);
+                entry = new SharedStateEntry(newHandle, checkpointID);
                 registeredStates.put(registrationKey, entry);
-                LOG.trace("Registered new shared state {} under key {}.", 
entry, registrationKey);
 
-            } else {
-                // Delete if this is a real duplicate.
-                // Note that task (backend) is not required to re-upload state
-                // if the confirmation notification was missing.
-                // However, it's also not required to use exactly the same 
handle or placeholder
-                if (!Objects.equals(state, entry.stateHandle)) {
-                    if (entry.confirmed || isPlaceholder(state)) {
-                        scheduledStateDeletion = state;
-                    } else {
-                        // Old entry is not in a confirmed checkpoint yet, and 
the new one differs.
-                        // This might result from (omitted KG range here for 
simplicity):
-                        // 1. Flink recovers from a failure using a checkpoint 
1
-                        // 2. State Backend is initialized to UID xyz and a 
set of SST: { 01.sst }
-                        // 3. JM triggers checkpoint 2
-                        // 4. TM sends handle: "xyz-002.sst"; JM registers it 
under "xyz-002.sst"
-                        // 5. TM crashes; everything is repeated from (2)
-                        // 6. TM recovers from CP 1 again: backend UID "xyz", 
SST { 01.sst }
-                        // 7. JM triggers checkpoint 3
-                        // 8. TM sends NEW state "xyz-002.sst"
-                        // 9. JM discards it as duplicate
-                        // 10. checkpoint completes, but a wrong SST file is 
used
-                        // So we use a new entry and discard the old one:
-                        scheduledStateDeletion = entry.stateHandle;
-                        entry.stateHandle = state;
-                    }
-                    LOG.trace(
-                            "Identified duplicate state registration under key 
{}. New state {} was determined to "
-                                    + "be an unnecessary copy of existing 
state {} and will be dropped.",
-                            registrationKey,
-                            state,
-                            entry.stateHandle);
-                }
+                // no further handling
+                return entry.stateHandle;
+
+            } else if (entry.stateHandle == newHandle) {
+                // might be a bug but state backend is not required to use a 
place-holder
+                LOG.debug(
+                        "Duplicated registration under key {} with the same 
object: {}",
+                        registrationKey,
+                        newHandle);
+            } else if (Objects.equals(entry.stateHandle, newHandle)) {
+                // might be a bug but state backend is not required to use a 
place-holder
+                LOG.debug(
+                        "Duplicated registration under key {} with the new 
object: {}.",
+                        registrationKey,
+                        newHandle);
+            } else if (isPlaceholder(newHandle)) {
                 LOG.trace(
-                        "Updating last checkpoint for {} from {} to {}",
+                        "Duplicated registration under key {} with a 
placeholder (normal case)",
+                        registrationKey);
+                scheduledStateDeletion = newHandle;
+            } else if (entry.confirmed) {
+                LOG.info(
+                        "Duplicated registration under key {} of a new state: 
{}. "
+                                + "This might happen if checkpoint 
confirmation was delayed and state backend re-uploaded the state. "
+                                + "Discarding the new state and keeping the 
old one which is included into a completed checkpoint",
                         registrationKey,
-                        entry.lastUsedCheckpointID,
-                        checkpointID);
-                entry.advanceLastUsingCheckpointID(checkpointID);
-                if (preventDiscardingCreatedCheckpoint) {
-                    entry.preventDiscardingCreatedCheckpoint();
-                }
+                        newHandle);
+                scheduledStateDeletion = newHandle;
+            } else {
+                // Old entry is not in a confirmed checkpoint yet, and the new 
one differs.
+                // This might result from (omitted KG range here for 
simplicity):
+                // 1. Flink recovers from a failure using a checkpoint 1
+                // 2. State Backend is initialized to UID xyz and a set of 
SST: { 01.sst }
+                // 3. JM triggers checkpoint 2
+                // 4. TM sends handle: "xyz-002.sst"; JM registers it under 
"xyz-002.sst"
+                // 5. TM crashes; everything is repeated from (2)
+                // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 
01.sst }
+                // 7. JM triggers checkpoint 3
+                // 8. TM sends NEW state "xyz-002.sst"
+                // 9. JM discards it as duplicate
+                // 10. checkpoint completes, but a wrong SST file is used
+                // So we use a new entry and discard the old one:
+                LOG.info(

Review Comment:
   IIUC, we only use placeholder state handle if the checkpoint contains sst 
that *has been confirmed*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to