[
https://issues.apache.org/jira/browse/FLINK-32130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feifan Wang updated FLINK-32130:
--------------------------------
Description:
Currently, _SharedStateRegistryImpl_ will discard old one while register new
state to same key:
{code:java}
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend
creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is
included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle; {code}
But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the following
case will fail (take _RocksDBStateBackend_ as an example):
# cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1
reference file-1
# cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try
register <1.sst,file-2>. SharedStateRegistry discard file-1
# cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
I add a test to reproduce the problem (
[pr-22606|https://github.com/apache/flink/pull/22606] ).
I think we should allow register multi state object to same key, WDYT
[~pnowojski], [~roman] ?
was:
Currently, _SharedStateRegistryImpl_ will discard old one while register new
state to same key:
{code:java}
// Old entry is not in a confirmed checkpoint yet, and the new one differs.
// This might result from (omitted KG range here for simplicity):
// 1. Flink recovers from a failure using a checkpoint 1
// 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
// 3. JM triggers checkpoint 2
// 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
// 5. TM crashes; everything is repeated from (2)
// 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
// 7. JM triggers checkpoint 3
// 8. TM sends NEW state "xyz-002.sst"
// 9. JM discards it as duplicate
// 10. checkpoint completes, but a wrong SST file is used
// So we use a new entry and discard the old one:
LOG.info(
"Duplicated registration under key {} of a new state: {}. "
+ "This might happen during the task failover if state backend
creates different states with the same key before and after the failure. "
+ "Discarding the OLD state and keeping the NEW one which is
included into a completed checkpoint",
registrationKey,
newHandle);
scheduledStateDeletion = entry.stateHandle;
entry.stateHandle = newHandle; {code}
But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the following
case will fail (take _RocksDBStateBackend_ as an example):
# cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1
reference file-1
# cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try
register <1.sst,file-2>. SharedStateRegistry discard file-1
# cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
I think we should allow register multi state object to same key, WDYT
[~pnowojski], [~roman] ?
> previous checkpoint will be broke by the subsequent incremental checkpoint
> --------------------------------------------------------------------------
>
> Key: FLINK-32130
> URL: https://issues.apache.org/jira/browse/FLINK-32130
> Project: Flink
> Issue Type: Bug
> Reporter: Feifan Wang
> Priority: Major
>
> Currently, _SharedStateRegistryImpl_ will discard old one while register new
> state to same key:
> {code:java}
> // Old entry is not in a confirmed checkpoint yet, and the new one differs.
> // This might result from (omitted KG range here for simplicity):
> // 1. Flink recovers from a failure using a checkpoint 1
> // 2. State Backend is initialized to UID xyz and a set of SST: { 01.sst }
> // 3. JM triggers checkpoint 2
> // 4. TM sends handle: "xyz-002.sst"; JM registers it under "xyz-002.sst"
> // 5. TM crashes; everything is repeated from (2)
> // 6. TM recovers from CP 1 again: backend UID "xyz", SST { 01.sst }
> // 7. JM triggers checkpoint 3
> // 8. TM sends NEW state "xyz-002.sst"
> // 9. JM discards it as duplicate
> // 10. checkpoint completes, but a wrong SST file is used
> // So we use a new entry and discard the old one:
> LOG.info(
> "Duplicated registration under key {} of a new state: {}. "
> + "This might happen during the task failover if state
> backend creates different states with the same key before and after the
> failure. "
> + "Discarding the OLD state and keeping the NEW one which is
> included into a completed checkpoint",
> registrationKey,
> newHandle);
> scheduledStateDeletion = entry.stateHandle;
> entry.stateHandle = newHandle; {code}
> But if _execution.checkpointing.max-concurrent-checkpoints_ > 1, the
> following case will fail (take _RocksDBStateBackend_ as an example):
> # cp1 trigger: 1.sst be uploaded to file-1, and register <1.sst,file-1>, cp1
> reference file-1
> # cp1 is not yet complete, cp2 trigger: 1.sst be uploaded to file-2, and try
> register <1.sst,file-2>. SharedStateRegistry discard file-1
> # cp1 completed and cp2 failed, but the cp1 is broken (file-1 has be deleted)
> I add a test to reproduce the problem (
> [pr-22606|https://github.com/apache/flink/pull/22606] ).
> I think we should allow register multi state object to same key, WDYT
> [~pnowojski], [~roman] ?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)