This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 7012130  [FLINK-25653][network] Move buffer recycle in 
SortMergeSubpartitionReader out of lock to avoid deadlock
7012130 is described below

commit 7012130d49a9305d996d3afe83ef383d9d9c85c4
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 10:42:25 2022 +0800

    [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader 
out of lock to avoid deadlock
    
    For the current sort-shuffle implementation, the different lock orders in 
SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler may cause 
deadlock. To solve the problem, this change moves buffer recycle in 
SortMergeSubpartitionReader out of the lock.
    
    This closes #18551.
---
 .../partition/SortMergeSubpartitionReader.java     | 34 ++++++++------
 .../SortMergeResultPartitionReadSchedulerTest.java | 53 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index ba2cd5f..aa44d4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -28,6 +28,8 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
@@ -97,21 +99,27 @@ class SortMergeSubpartitionReader
     }
 
     private void addBuffer(Buffer buffer) {
-        boolean notifyAvailable;
+        boolean notifyAvailable = false;
+        boolean needRecycleBuffer = false;
+
         synchronized (lock) {
             if (isReleased) {
-                buffer.recycleBuffer();
-                throw new IllegalStateException("Subpartition reader has been 
already released.");
-            }
-
-            notifyAvailable = buffersRead.isEmpty();
+                needRecycleBuffer = true;
+            } else {
+                notifyAvailable = buffersRead.isEmpty();
 
-            buffersRead.add(buffer);
-            if (buffer.isBuffer()) {
-                ++dataBufferBacklog;
+                buffersRead.add(buffer);
+                if (buffer.isBuffer()) {
+                    ++dataBufferBacklog;
+                }
             }
         }
 
+        if (needRecycleBuffer) {
+            buffer.recycleBuffer();
+            throw new IllegalStateException("Subpartition reader has been 
already released.");
+        }
+
         if (notifyAvailable) {
             notifyDataAvailable();
         }
@@ -171,6 +179,7 @@ class SortMergeSubpartitionReader
     }
 
     private void releaseInternal(@Nullable Throwable throwable) {
+        List<Buffer> buffersToRecycle;
         synchronized (lock) {
             if (isReleased) {
                 return;
@@ -180,12 +189,11 @@ class SortMergeSubpartitionReader
             if (failureCause == null) {
                 failureCause = throwable;
             }
-
-            for (Buffer buffer : buffersRead) {
-                buffer.recycleBuffer();
-            }
+            buffersToRecycle = new ArrayList<>(buffersRead);
             buffersRead.clear();
         }
+        buffersToRecycle.forEach(Buffer::recycleBuffer);
+        buffersToRecycle.clear();
 
         releaseFuture.complete(null);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 5c0f407..49761a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.TestLogger;
@@ -29,7 +31,13 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -58,6 +66,12 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
 
     private PartitionedFile partitionedFile;
 
+    private PartitionedFileReader fileReader;
+
+    private FileChannel dataFileChannel;
+
+    private FileChannel indexFileChannel;
+
     private BatchShuffleReadBufferPool bufferPool;
 
     private ExecutorService executor;
@@ -79,13 +93,19 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
                         numBuffersPerSubpartition,
                         bufferSize,
                         dataBytes);
+        dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
+        indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
+        fileReader =
+                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, 
indexFileChannel);
         bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
         executor = Executors.newFixedThreadPool(numThreads);
         readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool, 
executor, this);
     }
 
     @After
-    public void after() {
+    public void after() throws Exception {
+        dataFileChannel.close();
+        indexFileChannel.close();
         partitionedFile.deleteQuietly();
         bufferPool.destroy();
         executor.shutdown();
@@ -192,6 +212,37 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         assertAllResourcesReleased();
     }
 
+    @Test(timeout = 60000)
+    public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+        SortMergeSubpartitionReader subpartitionReader =
+                new SortMergeSubpartitionReader(new 
NoOpBufferAvailablityListener(), fileReader);
+        Thread readAndReleaseThread =
+                new Thread(
+                        () -> {
+                            Queue<MemorySegment> segments = new ArrayDeque<>();
+                            
segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+                            try {
+                                assertTrue(fileReader.hasRemaining());
+                                subpartitionReader.readBuffers(segments, 
readScheduler);
+                                subpartitionReader.releaseAllResources();
+                                subpartitionReader.readBuffers(segments, 
readScheduler);
+                            } catch (Exception ignore) {
+                            }
+                        });
+
+        synchronized (this) {
+            readAndReleaseThread.start();
+            do {
+                Thread.sleep(100);
+            } while (!subpartitionReader.isReleased());
+        }
+        readAndReleaseThread.join();
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
     private void assertAllResourcesReleased() {
         assertNull(readScheduler.getDataFileChannel());
         assertNull(readScheduler.getIndexFileChannel());

Reply via email to