Yun Tang created FLINK-26101:
--------------------------------
Summary: Avoid shared state registry to discard duplicate
changelog state
Key: FLINK-26101
URL: https://issues.apache.org/jira/browse/FLINK-26101
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Reporter: Yun Tang
Assignee: Yun Tang
Fix For: 1.15.0
Under change-log state backend, we will register same materialized keyed state
handle multi times, and {{SharedStateRegistryImpl}} will discard the duplicated
state handle.
{code:java}
if (!Objects.equals(state, entry.stateHandle)) {
if (entry.confirmed || isPlaceholder(state)) {
scheduledStateDeletion = state;
} else {
// Old entry is not in a confirmed checkpoint yet, and the new one
differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: {
01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under
"xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = state;
}
{code}
Thus, we need to implement the {{#equals}} method for the registered state
handles.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)