rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1210462542
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##########
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
}
+ @Test
+ public void testConcurrentCheckpointSharedStateRegistration() throws
Exception {
+ StateHandleID handleID = new StateHandleID("1.sst");
+ StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1",
new byte[] {'s'});
+ StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2",
new byte[] {'s'});
+
+ SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+ UUID backendID = UUID.randomUUID();
+
+ IncrementalRemoteKeyedStateHandle handle1 =
+ new IncrementalRemoteKeyedStateHandle(
+ backendID,
+ KeyGroupRange.of(0, 0),
+ 1L,
+ placeSpies(
+ new HashMap<StateHandleID,
StreamStateHandle>() {
+ {
+ put(handleID, streamHandle1);
+ }
+ }),
Review Comment:
`Collections.singletonMap`?
ditto: line 279
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -395,18 +394,16 @@ public void release() {
/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {
- @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+ @Nullable private final Map<StateHandleID, StreamStateHandle>
confirmedSstFiles;
- protected PreviousSnapshot(@Nullable Map<StateHandleID, Long>
confirmedSstFiles) {
+ protected PreviousSnapshot(
+ @Nullable Map<StateHandleID, StreamStateHandle>
confirmedSstFiles) {
this.confirmedSstFiles = confirmedSstFiles;
}
protected Optional<StreamStateHandle> getUploaded(StateHandleID
stateHandleID) {
if (confirmedSstFiles != null &&
confirmedSstFiles.containsKey(stateHandleID)) {
- // we introduce a placeholder state handle, that is replaced
with the
- // original from the shared state registry (created from a
previous checkpoint)
- return Optional.of(
- new
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
Review Comment:
Could you elaborate why you had to delete `PlaceholderStreamStateHandle`?
I'm concerned that with this change, `ByteStreamStateHandle` will always be
sent always to the JM (regardless of whether they were previously sent or not).
##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##########
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
}
+ @Test
+ public void testConcurrentCheckpointSharedStateRegistration() throws
Exception {
+ StateHandleID handleID = new StateHandleID("1.sst");
+ StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1",
new byte[] {'s'});
+ StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2",
new byte[] {'s'});
+
+ SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+ UUID backendID = UUID.randomUUID();
+
+ IncrementalRemoteKeyedStateHandle handle1 =
+ new IncrementalRemoteKeyedStateHandle(
+ backendID,
+ KeyGroupRange.of(0, 0),
+ 1L,
+ placeSpies(
+ new HashMap<StateHandleID,
StreamStateHandle>() {
+ {
+ put(handleID, streamHandle1);
+ }
+ }),
+ Collections.emptyMap(),
+ new ByteStreamStateHandle("", new byte[] {'s'}));
+
+ handle1.registerSharedStates(registry, handle1.getCheckpointId());
+
+ IncrementalRemoteKeyedStateHandle handle2 =
+ new IncrementalRemoteKeyedStateHandle(
+ backendID,
+ KeyGroupRange.of(0, 0),
+ 2L,
+ placeSpies(
+ new HashMap<StateHandleID,
StreamStateHandle>() {
+ {
+ put(handleID, streamHandle2);
+ }
+ }),
+ Collections.emptyMap(),
+ new ByteStreamStateHandle("", new byte[] {'s'}));
+
+ handle2.registerSharedStates(registry, handle2.getCheckpointId());
+
+ registry.checkpointCompleted(1L);
+ registry.checkpointCompleted(2L);
Review Comment:
Shouldn't this 2nd checkpoint be aborted? Otherwise, why would any state be
discarded?
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -395,18 +394,16 @@ public void release() {
/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {
- @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+ @Nullable private final Map<StateHandleID, StreamStateHandle>
confirmedSstFiles;
- protected PreviousSnapshot(@Nullable Map<StateHandleID, Long>
confirmedSstFiles) {
+ protected PreviousSnapshot(
+ @Nullable Map<StateHandleID, StreamStateHandle>
confirmedSstFiles) {
this.confirmedSstFiles = confirmedSstFiles;
}
protected Optional<StreamStateHandle> getUploaded(StateHandleID
stateHandleID) {
if (confirmedSstFiles != null &&
confirmedSstFiles.containsKey(stateHandleID)) {
- // we introduce a placeholder state handle, that is replaced
with the
- // original from the shared state registry (created from a
previous checkpoint)
- return Optional.of(
- new
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
Review Comment:
(otherwise, the branching code here can be simplified with
`Optional.ofNullable`)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##########
@@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() {
/** Create a unique key to register one of our shared state handles. */
@VisibleForTesting
- public SharedStateRegistryKey
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+ public SharedStateRegistryKey
createSharedStateRegistryKey(StreamStateHandle handle) {
+ String keyString = handle.getStreamStateHandleID().getKeyString();
return new SharedStateRegistryKey(
- String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());
Review Comment:
This won't work when multiple handles point to the same file (but use
different offsets), right?
Should we switch to using completely random IDs (not related to remote nor
local file paths)?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##########
@@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry
stateRegistry, long checkpo
for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle :
sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
-
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+ createSharedStateRegistryKey(sharedStateHandle.getValue());
Review Comment:
With this change, the key in `IncrementalRemoteKeyedStateHandle.sharedState`
is not actually a key anymore, but just a local file name used on recovery.
So keeping it as is seems confusing to me.
I think it would be clearer if we'd keep that property that
SharedStateRegistry key is the same key as in this map. That would require
storing the local file path explicitly (which is good), for example by adding a
holder class for `StreamStateHandle` and the local path.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]