This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3eb1075 [FLINK-17823][network] Resolve the race condition while
releasing RemoteInputChannel
3eb1075 is described below
commit 3eb1075ded64da20e6f7a5bc268f455eaf6573eb
Author: Zhijiang <[email protected]>
AuthorDate: Wed May 20 12:16:56 2020 +0800
[FLINK-17823][network] Resolve the race condition while releasing
RemoteInputChannel
RemoteInputChannel#releaseAllResources might be called by canceler thread.
Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer.
There probably cause two potential problems:
1. Task thread might get null buffer after canceler thread already released
all the buffers, then it might cause misleading NPE in getNextBuffer.
2. Task thread and canceler thread might pull the same buffer concurrently,
which causes unexpected exception when the same buffer is recycled twice.
The solution is to properly synchronize the buffer queue in release method
to avoid the same buffer pulled by both canceler thread and task thread.
And in getNextBuffer method, we add some explicit checks to avoid
misleading NPE and hint some valid exceptions.
---
.../partition/consumer/RemoteInputChannel.java | 15 +++++--
.../partition/consumer/RemoteInputChannelTest.java | 51 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1f260..ba8fc11 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -168,7 +169,6 @@ public class RemoteInputChannel extends InputChannel {
@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
- checkState(!isReleased.get(), "Queried for a buffer after
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a
buffer before requesting a queue.");
checkError();
@@ -181,6 +181,14 @@ public class RemoteInputChannel extends InputChannel {
moreAvailable = !receivedBuffers.isEmpty();
}
+ if (next == null) {
+ if (isReleased.get()) {
+ throw new CancelTaskException("Queried for a
buffer after channel has been released.");
+ } else {
+ throw new IllegalStateException("There should
always have queued buffers for unreleased channel.");
+ }
+ }
+
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next,
moreAvailable, 0));
@@ -242,9 +250,10 @@ public class RemoteInputChannel extends InputChannel {
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
- ArrayDeque<Buffer> releasedBuffers;
+ final ArrayDeque<Buffer> releasedBuffers;
synchronized (receivedBuffers) {
- releasedBuffers = receivedBuffers;
+ releasedBuffers = new
ArrayDeque<>(receivedBuffers);
+ receivedBuffers.clear();
}
bufferManager.releaseAllBuffers(releasedBuffers);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index f059df7..b280422 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -58,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -1010,6 +1011,56 @@ public class RemoteInputChannelTest {
}
}
+ @Test
+ public void testConcurrentGetNextBufferAndRelease() throws Exception {
+ final int numTotalBuffers = 1_000;
+ final int numExclusiveBuffers = 2;
+ final int numFloatingBuffers = 998;
+ final NetworkBufferPool networkBufferPool = new
NetworkBufferPool(numTotalBuffers, 32, numExclusiveBuffers);
+ final SingleInputGate inputGate = createSingleInputGate(1,
networkBufferPool);
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+ inputGate.setInputChannels(inputChannel);
+
+ final ExecutorService executor =
Executors.newFixedThreadPool(2);
+ Throwable thrown = null;
+ try {
+ BufferPool bufferPool =
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+ inputGate.setBufferPool(bufferPool);
+ inputGate.assignExclusiveSegments();
+ inputChannel.requestSubpartition(0);
+
+ for (int i = 0; i < numTotalBuffers; i++) {
+ Buffer buffer = inputChannel.requestBuffer();
+ inputChannel.onBuffer(buffer, i, 0);
+ }
+
+ final Callable<Void> getNextBufferTask = () -> {
+ try {
+ for (int i = 0; i < numTotalBuffers;
++i) {
+
Optional<InputChannel.BufferAndAvailability> bufferAndAvailability =
inputChannel.getNextBuffer();
+
bufferAndAvailability.ifPresent(buffer -> buffer.buffer().recycleBuffer());
+ }
+ } catch (Throwable t) {
+ if (!inputChannel.isReleased()) {
+ throw new
AssertionError("Exceptions are expected here only if the input channel was
released", t);
+ }
+ }
+ return null;
+ };
+
+ final Callable<Void> releaseTask = () -> {
+ inputChannel.releaseAllResources();
+ return null;
+ };
+
+ submitTasksAndWaitForResults(executor, new Callable[]
{getNextBufferTask, releaseTask});
+ } catch (Throwable t) {
+ thrown = t;
+ } finally {
+ cleanup(networkBufferPool, executor, null, thrown,
inputChannel);
+ }
+ }
+
/**
* Tests that {@link
RemoteInputChannel#retriggerSubpartitionRequest(int)} would throw
* the {@link PartitionNotFoundException} if backoff is 0.