Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3870#discussion_r116161318
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java
 ---
    @@ -180,69 +176,66 @@ public long getStateSize() {
     
        @Override
        public void registerSharedStates(SharedStateRegistry stateRegistry) {
    +
                Preconditions.checkState(!registered, "The state handle has 
already registered its shared states.");
     
    -           for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
newSstFiles.entrySet()) {
    -                   SstFileStateHandle stateHandle = new 
SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue());
    +           for (Map.Entry<String, StreamStateHandle> newSstFileEntry : 
unregisteredSstFiles.entrySet()) {
    +                   SharedStateRegistryKey registryKey =
    +                           
createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
     
    -                   int referenceCount = 
stateRegistry.register(stateHandle);
    -                   Preconditions.checkState(referenceCount == 1);
    +                   SharedStateRegistry.Result result =
    +                           stateRegistry.registerNewReference(registryKey, 
newSstFileEntry.getValue());
    +
    +                   // We update our reference with the result from the 
registry, to prevent the following
    +                   // problem:
    +                   // A previous checkpoint n has already registered the 
state. This can happen if a
    +                   // following checkpoint (n + x) wants to reference the 
same state before the backend got
    +                   // notified that checkpoint n completed. In this case, 
the shared registry did
    +                   // deduplication and returns the previous reference.
    +                   newSstFileEntry.setValue(result.getReference());
                }
     
    -           for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : 
oldSstFiles.entrySet()) {
    -                   SstFileStateHandle stateHandle = new 
SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue());
    +           for (Map.Entry<String, StreamStateHandle> oldSstFileName : 
registeredSstFiles.entrySet()) {
    --- End diff --
    
    Similar to the previous comment, `oldSstFileName` can be renamed to 
`unregisteredSstFileEntry` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to