Myasuka commented on a change in pull request #19050:
URL: https://github.com/apache/flink/pull/19050#discussion_r824419394



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -204,22 +205,25 @@ public SequenceNumber nextSequenceNumber() {
             SequenceNumberRange range = SequenceNumberRange.generic(from, 
activeSequenceNumber);
             if (range.size() == readyToReturn.size()) {
                 checkState(toUpload.isEmpty());
-                return completedFuture(buildHandle(keyGroupRange, 
readyToReturn, 0L));
+                future.complete(buildHandle(keyGroupRange, readyToReturn, 0L));
             } else {
-                CompletableFuture<ChangelogStateHandleStreamImpl> future =
-                        new CompletableFuture<>();
                 uploadCompletionListeners.add(
                         new UploadCompletionListener(keyGroupRange, range, 
readyToReturn, future));
                 if (!toUpload.isEmpty()) {
-                    uploader.upload(
+                    uploadTask =
                             new UploadTask(
                                     toUpload.values(),
                                     this::handleUploadSuccess,
-                                    this::handleUploadFailure));
+                                    this::handleUploadFailure);
                 }
-                return future;
             }
         }
+        // upload outside the synchronized block to prevent deadlock: capacity 
might be needed,
+        // but upload threads need to acquire lock in completion callbacks
+               if (uploadTask != null) {
+                       uploader.upload(uploadTask);
+               }

Review comment:
       If we take a deep dive into the root cause, the deadlock happens due to 
`StateChangeUploadScheduler#upload` could hang forever and it will exit the 
infinite-wait once `FsStateChangelogWriter#handleUploadSuccess` could be 
finished. In other words, we creates a relationship between them, which is not 
obvious in the interface design.
   
   In other words, the `FsStateChangelogWriter` should not know the 
implementation of `StateChangeUploadScheduler`, and the concept of `capacity` 
in `StateChangeUploadScheduler` should not be visiable to the caller (the 
comment actualy expose such concept).
   
   From my mind, I feel like that a better solution is to let 
the`StateChangeUploadScheduler` itself could handle such deadlock, the 
`StateChangeUploadScheduler#upload` method should not be blocking and introduce 
another async call to be blocked for the capacity to release. Though I did not 
implement it, I think this is cleaner than current solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to