This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 7012130 [FLINK-25653][network] Move buffer recycle in
SortMergeSubpartitionReader out of lock to avoid deadlock
7012130 is described below
commit 7012130d49a9305d996d3afe83ef383d9d9c85c4
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 10:42:25 2022 +0800
[FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader
out of lock to avoid deadlock
For the current sort-shuffle implementation, the different lock orders in
SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler may cause
deadlock. To solve the problem, this change moves buffer recycle in
SortMergeSubpartitionReader out of the lock.
This closes #18551.
---
.../partition/SortMergeSubpartitionReader.java | 34 ++++++++------
.../SortMergeResultPartitionReadSchedulerTest.java | 53 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index ba2cd5f..aa44d4c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -28,6 +28,8 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -97,21 +99,27 @@ class SortMergeSubpartitionReader
}
private void addBuffer(Buffer buffer) {
- boolean notifyAvailable;
+ boolean notifyAvailable = false;
+ boolean needRecycleBuffer = false;
+
synchronized (lock) {
if (isReleased) {
- buffer.recycleBuffer();
- throw new IllegalStateException("Subpartition reader has been
already released.");
- }
-
- notifyAvailable = buffersRead.isEmpty();
+ needRecycleBuffer = true;
+ } else {
+ notifyAvailable = buffersRead.isEmpty();
- buffersRead.add(buffer);
- if (buffer.isBuffer()) {
- ++dataBufferBacklog;
+ buffersRead.add(buffer);
+ if (buffer.isBuffer()) {
+ ++dataBufferBacklog;
+ }
}
}
+ if (needRecycleBuffer) {
+ buffer.recycleBuffer();
+ throw new IllegalStateException("Subpartition reader has been
already released.");
+ }
+
if (notifyAvailable) {
notifyDataAvailable();
}
@@ -171,6 +179,7 @@ class SortMergeSubpartitionReader
}
private void releaseInternal(@Nullable Throwable throwable) {
+ List<Buffer> buffersToRecycle;
synchronized (lock) {
if (isReleased) {
return;
@@ -180,12 +189,11 @@ class SortMergeSubpartitionReader
if (failureCause == null) {
failureCause = throwable;
}
-
- for (Buffer buffer : buffersRead) {
- buffer.recycleBuffer();
- }
+ buffersToRecycle = new ArrayList<>(buffersRead);
buffersRead.clear();
}
+ buffersToRecycle.forEach(Buffer::recycleBuffer);
+ buffersToRecycle.clear();
releaseFuture.complete(null);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 5c0f407..49761a3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.TestLogger;
@@ -29,7 +31,13 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -58,6 +66,12 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
private PartitionedFile partitionedFile;
+ private PartitionedFileReader fileReader;
+
+ private FileChannel dataFileChannel;
+
+ private FileChannel indexFileChannel;
+
private BatchShuffleReadBufferPool bufferPool;
private ExecutorService executor;
@@ -79,13 +93,19 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
numBuffersPerSubpartition,
bufferSize,
dataBytes);
+ dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
+ indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
+ fileReader =
+ new PartitionedFileReader(partitionedFile, 0, dataFileChannel,
indexFileChannel);
bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
executor = Executors.newFixedThreadPool(numThreads);
readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool,
executor, this);
}
@After
- public void after() {
+ public void after() throws Exception {
+ dataFileChannel.close();
+ indexFileChannel.close();
partitionedFile.deleteQuietly();
bufferPool.destroy();
executor.shutdown();
@@ -192,6 +212,37 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
assertAllResourcesReleased();
}
+ @Test(timeout = 60000)
+ public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+ SortMergeSubpartitionReader subpartitionReader =
+ new SortMergeSubpartitionReader(new
NoOpBufferAvailablityListener(), fileReader);
+ Thread readAndReleaseThread =
+ new Thread(
+ () -> {
+ Queue<MemorySegment> segments = new ArrayDeque<>();
+
segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+ try {
+ assertTrue(fileReader.hasRemaining());
+ subpartitionReader.readBuffers(segments,
readScheduler);
+ subpartitionReader.releaseAllResources();
+ subpartitionReader.readBuffers(segments,
readScheduler);
+ } catch (Exception ignore) {
+ }
+ });
+
+ synchronized (this) {
+ readAndReleaseThread.start();
+ do {
+ Thread.sleep(100);
+ } while (!subpartitionReader.isReleased());
+ }
+ readAndReleaseThread.join();
+ }
+
+ private static FileChannel openFileChannel(Path path) throws IOException {
+ return FileChannel.open(path, StandardOpenOption.READ);
+ }
+
private void assertAllResourcesReleased() {
assertNull(readScheduler.getDataFileChannel());
assertNull(readScheduler.getIndexFileChannel());