dawidwys commented on a change in pull request #18539:
URL: https://github.com/apache/flink/pull/18539#discussion_r795709706



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -224,19 +289,27 @@ public void close() {
         }
     }
 
+    @FunctionalInterface
+    private interface PostDispose {
+        void execute() throws Exception;
+    }
+
     /** Encapsulates the operation the delete state handles asynchronously. */
     private static final class AsyncDisposalRunnable implements Runnable {
 
-        private final StateObject toDispose;
+        private final StreamStateHandle toDispose;
+        private final PostDispose postDispose;
 
-        public AsyncDisposalRunnable(StateObject toDispose) {
+        public AsyncDisposalRunnable(StreamStateHandle toDispose, PostDispose 
postDispose) {

Review comment:
       `registryKey` is unfortunately not enough. 
   
   When `AsyncDisposalRunnable` is created from `registerReference` we should 
not remove the key from the `restoredSharedStates`, as there it is used for 
simply discarding a duplicated file that is not tracked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to