This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed4937cbc0ee73e08856cc59bfbf788682c70a09 Author: Wencong Liu <[email protected]> AuthorDate: Sun Aug 13 14:44:51 2023 +0800 [hotfix] Fix the deadlock of DiskIOScheduler#sortScheduledReaders of Hybrid Shuffle --- .../hybrid/tiered/tier/disk/DiskIOScheduler.java | 14 +++---- .../hybrid/tiered/file/DiskIOSchedulerTest.java | 47 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java index 9f6e6036cc5..d8d1c6fcce3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskIOScheduler.java @@ -234,18 +234,18 @@ public class DiskIOScheduler implements Runnable, BufferRecycler, NettyServicePr } private List<ScheduledSubpartitionReader> sortScheduledReaders() { + List<ScheduledSubpartitionReader> scheduledReaders; synchronized (lock) { if (isReleased) { return new ArrayList<>(); } - List<ScheduledSubpartitionReader> scheduledReaders = new ArrayList<>(); - for (ScheduledSubpartitionReader reader : allScheduledReaders.values()) { - reader.prepareForScheduling(); - scheduledReaders.add(reader); - } - Collections.sort(scheduledReaders); - return scheduledReaders; + scheduledReaders = new ArrayList<>(allScheduledReaders.values()); + } + for (ScheduledSubpartitionReader reader : scheduledReaders) { + reader.prepareForScheduling(); } + Collections.sort(scheduledReaders); + return scheduledReaders; } private Queue<MemorySegment> allocateBuffers() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java index 75022ba8a91..98bccb530f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/DiskIOSchedulerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.file; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; @@ -26,9 +27,11 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TestingNettyConnectionWriter; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskIOScheduler; +import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -145,7 +149,7 @@ class DiskIOSchedulerTest { .setWriteBufferFunction( nettyPayload -> { if (nettyPayload.getSegmentId() == -1) { - bufferWriteNotifier1.complete(nettyPayload); + bufferWriteNotifier2.complete(nettyPayload); } return null; }) @@ -206,6 +210,47 @@ class DiskIOSchedulerTest { .isInstanceOf(IllegalStateException.class); } + /** + * The {@link DiskIOScheduler} shouldn't hold the lock when sending {@link NettyPayload} with + * segment id to {@link NettyConnectionWriter}, otherwise there may happen a deadlock when the + * downstream is trying to request the lock in {@link DiskIOScheduler}. + */ + @Test + void testDeadLock() { + CompletableFuture<NettyPayload> waitFuture1 = new CompletableFuture<>(); + CompletableFuture<NettyPayload> waitFuture2 = new CompletableFuture<>(); + TestingNettyConnectionWriter nettyConnectionWriter = + new TestingNettyConnectionWriter.Builder() + .setWriteBufferFunction( + nettyPayload -> { + try { + waitFuture2.complete(null); + waitFuture1.get(); + } catch (InterruptedException | ExecutionException e) { + ExceptionUtils.rethrow(e); + } + return null; + }) + .build(); + // Test if consumer thread can get the lock correctly. + CheckedThread consumerThread = + new CheckedThread() { + @Override + public void go() throws Exception { + waitFuture2.get(); + // Get the lock in disk io scheduler. + diskIOScheduler.release(); + waitFuture1.complete(null); + } + }; + consumerThread.start(); + diskIOScheduler.connectionEstablished( + new TieredStorageSubpartitionId(0), nettyConnectionWriter); + ioExecutor.trigger(); + assertThat(waitFuture1).isDone(); + assertThat(waitFuture2).isDone(); + } + private List<Map<Integer, Integer>> createFirstBufferIndexInSegment() { Map<Integer, Integer> firstBufferIndexInSegment0 = new HashMap<>(); Map<Integer, Integer> firstBufferIndexInSegment1 = new HashMap<>();
