This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed4937cbc0ee73e08856cc59bfbf788682c70a09
Author: Wencong Liu <[email protected]>
AuthorDate: Sun Aug 13 14:44:51 2023 +0800

    [hotfix] Fix the deadlock of DiskIOScheduler#sortScheduledReaders of Hybrid 
Shuffle
---
 .../hybrid/tiered/tier/disk/DiskIOScheduler.java   | 14 +++----
 .../hybrid/tiered/file/DiskIOSchedulerTest.java    | 47 +++++++++++++++++++++-
 2 files changed, 53 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
index 9f6e6036cc5..d8d1c6fcce3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java
@@ -234,18 +234,18 @@ public class DiskIOScheduler implements Runnable, 
BufferRecycler, NettyServicePr
     }
 
     private List<ScheduledSubpartitionReader> sortScheduledReaders() {
+        List<ScheduledSubpartitionReader> scheduledReaders;
         synchronized (lock) {
             if (isReleased) {
                 return new ArrayList<>();
             }
-            List<ScheduledSubpartitionReader> scheduledReaders = new 
ArrayList<>();
-            for (ScheduledSubpartitionReader reader : 
allScheduledReaders.values()) {
-                reader.prepareForScheduling();
-                scheduledReaders.add(reader);
-            }
-            Collections.sort(scheduledReaders);
-            return scheduledReaders;
+            scheduledReaders = new ArrayList<>(allScheduledReaders.values());
+        }
+        for (ScheduledSubpartitionReader reader : scheduledReaders) {
+            reader.prepareForScheduling();
         }
+        Collections.sort(scheduledReaders);
+        return scheduledReaders;
     }
 
     private Queue<MemorySegment> allocateBuffers() throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
index 75022ba8a91..98bccb530f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file;
 
+import org.apache.flink.core.testutils.CheckedThread;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
@@ -26,9 +27,11 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionWriter;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -40,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -145,7 +149,7 @@ class DiskIOSchedulerTest {
                         .setWriteBufferFunction(
                                 nettyPayload -> {
                                     if (nettyPayload.getSegmentId() == -1) {
-                                        
bufferWriteNotifier1.complete(nettyPayload);
+                                        
bufferWriteNotifier2.complete(nettyPayload);
                                     }
                                     return null;
                                 })
@@ -206,6 +210,47 @@ class DiskIOSchedulerTest {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    /**
+     * The {@link DiskIOScheduler} shouldn't hold the lock when sending {@link 
NettyPayload} with
+     * segment id to {@link NettyConnectionWriter}, otherwise there may happen 
a deadlock when the
+     * downstream is trying to request the lock in {@link DiskIOScheduler}.
+     */
+    @Test
+    void testDeadLock() {
+        CompletableFuture<NettyPayload> waitFuture1 = new 
CompletableFuture<>();
+        CompletableFuture<NettyPayload> waitFuture2 = new 
CompletableFuture<>();
+        TestingNettyConnectionWriter nettyConnectionWriter =
+                new TestingNettyConnectionWriter.Builder()
+                        .setWriteBufferFunction(
+                                nettyPayload -> {
+                                    try {
+                                        waitFuture2.complete(null);
+                                        waitFuture1.get();
+                                    } catch (InterruptedException | 
ExecutionException e) {
+                                        ExceptionUtils.rethrow(e);
+                                    }
+                                    return null;
+                                })
+                        .build();
+        // Test if consumer thread can get the lock correctly.
+        CheckedThread consumerThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        waitFuture2.get();
+                        // Get the lock in disk io scheduler.
+                        diskIOScheduler.release();
+                        waitFuture1.complete(null);
+                    }
+                };
+        consumerThread.start();
+        diskIOScheduler.connectionEstablished(
+                new TieredStorageSubpartitionId(0), nettyConnectionWriter);
+        ioExecutor.trigger();
+        assertThat(waitFuture1).isDone();
+        assertThat(waitFuture2).isDone();
+    }
+
     private List<Map<Integer, Integer>> createFirstBufferIndexInSegment() {
         Map<Integer, Integer> firstBufferIndexInSegment0 = new HashMap<>();
         Map<Integer, Integer> firstBufferIndexInSegment1 = new HashMap<>();

Reply via email to