xintongsong commented on code in PR #20796:
URL: https://github.com/apache/flink/pull/20796#discussion_r966838325


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java:
##########
@@ -340,6 +341,57 @@ void testRegisterSubpartitionReaderAfterReleased() {
                 .hasMessageContaining("HsFileDataManager is already 
released.");
     }
 
+    /**
+     * When the result partition fails, the view lock may be obtained when the 
FileDataManager lock
+     * is held. In the same time, the downstream thread will acquire the lock 
of the FileDataManager
+     * when acquiring the view lock. To avoid this deadlock, the logical of 
subpartition view
+     * release subpartition reader and subpartition reader fail should not be 
inside lock.
+     */
+    @Test
+    void testDeadLock() throws Exception {
+        TestingHsSubpartitionFileReader reader = new 
TestingHsSubpartitionFileReader();
+        CompletableFuture<Void> consumerStart = new CompletableFuture<>();
+        CompletableFuture<Void> readerFail = new CompletableFuture<>();
+        HsSubpartitionView subpartitionView =
+                new HsSubpartitionView(new NoOpBufferAvailablityListener());
+        reader.setFailConsumer(
+                (throwable) -> {
+                    try {
+                        readerFail.complete(null);
+                        consumerStart.get();
+                        // try to get view lock.
+                        subpartitionView.notifyDataAvailable();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });

Review Comment:
   This feels like whitebox testing.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionView.java:
##########
@@ -85,10 +86,12 @@ public BufferAndBacklog getNextBuffer() {
                 updateConsumingStatus(bufferToConsume);
                 return bufferToConsume.map(this::handleBacklog).orElse(null);
             } catch (Throwable cause) {
-                releaseInternal(cause);
-                return null;
+                readError = cause;
             }
         }
+        // release subpartition reader outside of lock to avoid deadlock.
+        releaseInternal(readError);
+        return null;

Review Comment:
   We can simply move the `synchronized` block into inside the `try` block.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java:
##########
@@ -340,6 +341,57 @@ void testRegisterSubpartitionReaderAfterReleased() {
                 .hasMessageContaining("HsFileDataManager is already 
released.");
     }
 
+    /**
+     * When the result partition fails, the view lock may be obtained when the 
FileDataManager lock
+     * is held. In the same time, the downstream thread will acquire the lock 
of the FileDataManager
+     * when acquiring the view lock. To avoid this deadlock, the logical of 
subpartition view
+     * release subpartition reader and subpartition reader fail should not be 
inside lock.
+     */
+    @Test
+    void testDeadLock() throws Exception {

Review Comment:
   ```suggestion
       void testConsumeWhileReleaseNoDeadlock() throws Exception {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java:
##########
@@ -192,19 +192,22 @@ public void 
releaseSubpartitionReader(HsSubpartitionFileReader subpartitionFileR
 
     /** Releases this file data manager and delete shuffle data after all 
readers is removed. */
     public void release() {
+        List<HsSubpartitionFileReader> pendingReaders;
         synchronized (lock) {
             if (isReleased) {
                 return;
             }
             isReleased = true;
 
-            List<HsSubpartitionFileReader> pendingReaders = new 
ArrayList<>(allReaders);
+            pendingReaders = new ArrayList<>(allReaders);
             mayNotifyReleased();
+            // delete the shuffle file only when no reader is reading now.
+            releaseFuture.thenRun(this::deleteShuffleFile);
+        }
+        if (!pendingReaders.isEmpty()) {

Review Comment:
   I'm not sure about calling `failSubpartitionReaders` outside the 
lock-guarded scope. The problem is that, readers are not removed from 
`allReaders` until `failSubpartitionReaders` is called.
   
   I don't see how this change is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to