This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3eb1075  [FLINK-17823][network] Resolve the race condition while 
releasing RemoteInputChannel
3eb1075 is described below

commit 3eb1075ded64da20e6f7a5bc268f455eaf6573eb
Author: Zhijiang <[email protected]>
AuthorDate: Wed May 20 12:16:56 2020 +0800

    [FLINK-17823][network] Resolve the race condition while releasing 
RemoteInputChannel
    
    RemoteInputChannel#releaseAllResources might be called by canceler thread. 
Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer.
    There probably cause two potential problems:
    
    1. Task thread might get null buffer after canceler thread already released 
all the buffers, then it might cause misleading NPE in getNextBuffer.
    2. Task thread and canceler thread might pull the same buffer concurrently, 
which causes unexpected exception when the same buffer is recycled twice.
    
    The solution is to properly synchronize the buffer queue in release method 
to avoid the same buffer pulled by both canceler thread and task thread.
    And in getNextBuffer method, we add some explicit checks to avoid 
misleading NPE and hint some valid exceptions.
---
 .../partition/consumer/RemoteInputChannel.java     | 15 +++++--
 .../partition/consumer/RemoteInputChannelTest.java | 51 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1f260..ba8fc11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -168,7 +169,6 @@ public class RemoteInputChannel extends InputChannel {
 
        @Override
        Optional<BufferAndAvailability> getNextBuffer() throws IOException {
-               checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
                checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
 
                checkError();
@@ -181,6 +181,14 @@ public class RemoteInputChannel extends InputChannel {
                        moreAvailable = !receivedBuffers.isEmpty();
                }
 
+               if (next == null) {
+                       if (isReleased.get()) {
+                               throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
+                       } else {
+                               throw new IllegalStateException("There should 
always have queued buffers for unreleased channel.");
+                       }
+               }
+
                numBytesIn.inc(next.getSize());
                numBuffersIn.inc();
                return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
@@ -242,9 +250,10 @@ public class RemoteInputChannel extends InputChannel {
        void releaseAllResources() throws IOException {
                if (isReleased.compareAndSet(false, true)) {
 
-                       ArrayDeque<Buffer> releasedBuffers;
+                       final ArrayDeque<Buffer> releasedBuffers;
                        synchronized (receivedBuffers) {
-                               releasedBuffers = receivedBuffers;
+                               releasedBuffers = new 
ArrayDeque<>(receivedBuffers);
+                               receivedBuffers.clear();
                        }
                        bufferManager.releaseAllBuffers(releasedBuffers);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index f059df7..b280422 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -1010,6 +1011,56 @@ public class RemoteInputChannelTest {
                }
        }
 
+       @Test
+       public void testConcurrentGetNextBufferAndRelease() throws Exception {
+               final int numTotalBuffers  = 1_000;
+               final int numExclusiveBuffers = 2;
+               final int numFloatingBuffers = 998;
+               final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numTotalBuffers, 32, numExclusiveBuffers);
+               final SingleInputGate inputGate = createSingleInputGate(1, 
networkBufferPool);
+               final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
+               inputGate.setInputChannels(inputChannel);
+
+               final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+               Throwable thrown = null;
+               try {
+                       BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+                       inputGate.setBufferPool(bufferPool);
+                       inputGate.assignExclusiveSegments();
+                       inputChannel.requestSubpartition(0);
+
+                       for (int i = 0; i < numTotalBuffers; i++) {
+                               Buffer buffer = inputChannel.requestBuffer();
+                               inputChannel.onBuffer(buffer, i, 0);
+                       }
+
+                       final Callable<Void> getNextBufferTask = () -> {
+                               try {
+                                       for (int i = 0; i < numTotalBuffers; 
++i) {
+                                               
Optional<InputChannel.BufferAndAvailability> bufferAndAvailability = 
inputChannel.getNextBuffer();
+                                               
bufferAndAvailability.ifPresent(buffer -> buffer.buffer().recycleBuffer());
+                                       }
+                               } catch (Throwable t) {
+                                       if (!inputChannel.isReleased()) {
+                                               throw new 
AssertionError("Exceptions are expected here only if the input channel was 
released", t);
+                                       }
+                               }
+                               return null;
+                       };
+
+                       final Callable<Void> releaseTask = () -> {
+                               inputChannel.releaseAllResources();
+                               return null;
+                       };
+
+                       submitTasksAndWaitForResults(executor, new Callable[] 
{getNextBufferTask, releaseTask});
+               } catch (Throwable t) {
+                       thrown = t;
+               } finally {
+                       cleanup(networkBufferPool, executor, null, thrown, 
inputChannel);
+               }
+       }
+
        /**
         * Tests that {@link 
RemoteInputChannel#retriggerSubpartitionRequest(int)} would throw
         * the {@link PartitionNotFoundException} if backoff is 0.

Reply via email to