[
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470
]
Feifan Wang commented on FLINK-29913:
-------------------------------------
Thanks for the clarification [~roman] !
{quote}Further, regarding the approach of using unique registry key, I agree
with Congxian Qiu , we can just choose a stable register key generation method
based on remote file name (such as use md5 digest of remote file name) , which
can replace of
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like
this :
{code:java}
...
private final Map<StateHandleID, StreamStateHandle> sharedState; // still use
local file name as key of this map, corresponding to the “never change” I
mentioned above
...
public void registerSharedStates(SharedStateRegistry stateRegistry, long
checkpointID) {
...
for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle :
sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
generateRegisterKey(sharedStateHandle.getValue); // changed line
StreamStateHandle reference =
stateRegistry.registerReference(
registryKey, sharedStateHandle.getValue(),
checkpointID);
sharedStateHandle.setValue(reference);
}
}
private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle
stateHandle) {
String keyString = null;
if (stateHandle instanceof FileStateHandle) {
keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
} else if (stateHandle instanceof ByteStreamStateHandle) {
keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
} else {
keyString = Integer.toString(System.identityHashCode(stateHandle));
}
return new SharedStateRegistryKey(md5sum(keyString)); // may be other
digest algorithm
}
{code}
> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -------------------------------------------------------------------------
>
> Key: FLINK-29913
> URL: https://issues.apache.org/jira/browse/FLINK-29913
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.15.0, 1.16.0, 1.17.0
> Reporter: Yanfei Lei
> Assignee: Feifan Wang
> Priority: Major
> Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state
> backend would be discarded by registering the same name handle. See
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)