Repository: flink
Updated Branches:
  refs/heads/release-1.5 f857156ce -> 6ce8211ba


[FLINK-9676][network] clarify contracts of 
BufferListener#notifyBufferAvailable() and fix a deadlock

When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but
acquiring them in reverse order.

One such instance would be:

Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle 
floating buffers
        -> lock(LocalBufferPool#availableMemorySegments) -> 
RemoteInputChannel2#notifyBufferAvailable
        -> try to lock(RemoteInputChannel2#bufferQueue)

Task thread -> RemoteInputChannel2#recycle
        -> lock(RemoteInputChannel2#bufferQueue) -> 
bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
        -> try to lock(LocalBufferPool#availableMemorySegments)

Therefore, we decouple the listener callback from lock around
LocalBufferPool#availableMemorySegments and implicitly enforce that
RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
be acquired first and should never be taken after having locked on
LocalBufferPool#availableMemorySegments.

This closes #6257.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ce8211b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ce8211b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ce8211b

Branch: refs/heads/release-1.5
Commit: 6ce8211bab84308a17043dc4901d6a93b2777da8
Parents: f857156
Author: Nico Kruber <[email protected]>
Authored: Wed Jul 4 17:45:18 2018 +0200
Committer: Nico Kruber <[email protected]>
Committed: Thu Jul 5 12:20:23 2018 +0200

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      | 56 +++++++++--
 .../io/network/buffer/LocalBufferPoolTest.java  |  7 +-
 .../consumer/RemoteInputChannelTest.java        | 99 +++++++++++++++++++-
 3 files changed, 147 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ce8211b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 7d9aa21..77eb601 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -57,6 +57,12 @@ class LocalBufferPool implements BufferPool {
        /**
         * The currently available memory segments. These are segments, which 
have been requested from
         * the network buffer pool and are currently not handed out as Buffer 
instances.
+        *
+        * <p><strong>BEWARE:</strong> Take special care with the interactions 
between this lock and
+        * locks acquired before entering this class vs. locks being acquired 
during calls to external
+        * code inside this class, e.g. with
+        * {@link 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#bufferQueue}
+        * via the {@link #registeredListeners} callback.
         */
        private final ArrayDeque<MemorySegment> availableMemorySegments = new 
ArrayDeque<MemorySegment>();
 
@@ -251,27 +257,56 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public void recycle(MemorySegment segment) {
+               BufferListener listener;
                synchronized (availableMemorySegments) {
                        if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
                                returnMemorySegment(segment);
+                               return;
                        }
                        else {
-                               BufferListener listener = 
registeredListeners.poll();
+                               listener = registeredListeners.poll();
 
                                if (listener == null) {
                                        availableMemorySegments.add(segment);
                                        availableMemorySegments.notify();
+                                       return;
                                }
-                               else {
-                                       try {
-                                               boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-                                               if (needMoreBuffers) {
-                                                       
registeredListeners.add(listener);
-                                               }
+                       }
+               }
+
+               // We do not know which locks have been acquired before the 
recycle() or are needed in the
+               // notification and which other threads also access them.
+               // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+               boolean success = false;
+               boolean needMoreBuffers = false;
+               try {
+                       needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+                       success = true;
+               } catch (Throwable ignored) {
+                       // handled below, under the lock
+               }
+
+               if (!success || needMoreBuffers) {
+                       synchronized (availableMemorySegments) {
+                               if (isDestroyed) {
+                                       // cleanup tasks how they would have 
been done if we only had one synchronized block
+                                       if (needMoreBuffers) {
+                                               
listener.notifyBufferDestroyed();
                                        }
-                                       catch (Throwable ignored) {
-                                               
availableMemorySegments.add(segment);
-                                               
availableMemorySegments.notify();
+                                       if (!success) {
+                                               returnMemorySegment(segment);
+                                       }
+                               } else {
+                                       if (needMoreBuffers) {
+                                               
registeredListeners.add(listener);
+                                       }
+                                       if (!success) {
+                                               if 
(numberOfRequestedMemorySegments > currentPoolSize) {
+                                                       
returnMemorySegment(segment);
+                                               } else {
+                                                       
availableMemorySegments.add(segment);
+                                                       
availableMemorySegments.notify();
+                                               }
                                        }
                                }
                        }
@@ -283,6 +318,7 @@ class LocalBufferPool implements BufferPool {
         */
        @Override
        public void lazyDestroy() {
+               // NOTE: if you change this logic, be sure to update recycle() 
as well!
                synchronized (availableMemorySegments) {
                        if (!isDestroyed) {
                                MemorySegment segment;

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce8211b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index b04286e..8834291 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -405,13 +406,13 @@ public class LocalBufferPoolTest {
 
        private BufferListener createBufferListener(int notificationTimes) {
                return spy(new BufferListener() {
-                       int times = 0;
+                       AtomicInteger times = new AtomicInteger(0);
 
                        @Override
                        public boolean notifyBufferAvailable(Buffer buffer) {
-                               times++;
+                               int newCount = times.incrementAndGet();
                                buffer.recycleBuffer();
-                               return times < notificationTimes;
+                               return newCount < notificationTimes;
                        }
 
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6ce8211b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 802cb93..4080106 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -52,6 +53,7 @@ import scala.Tuple2;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -62,6 +64,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link RemoteInputChannel}.
+ */
 public class RemoteInputChannelTest {
 
        @Test
@@ -804,7 +809,7 @@ public class RemoteInputChannelTest {
                                recycleFloatingBufferTask(bufferPool, 
numFloatingBuffers),
                                requestBufferTask});
 
-                       assertEquals("There should be " + 
inputChannel.getNumberOfRequiredBuffers() +" buffers available in channel.",
+                       assertEquals("There should be " + 
inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
                                inputChannel.getNumberOfRequiredBuffers(), 
inputChannel.getNumberOfAvailableBuffers());
                        assertEquals("There should be no buffers available in 
local pool.",
                                0, 
bufferPool.getNumberOfAvailableMemorySegments());
@@ -878,6 +883,95 @@ public class RemoteInputChannelTest {
                }
        }
 
+       /**
+        * Tests to verify that there is no race condition with two things 
running in parallel:
+        * recycling exclusive buffers and recycling external buffers to the 
buffer pool while the
+        * recycling of the exclusive buffer triggers recycling a floating 
buffer (FLINK-9676).
+        */
+       @Test
+       public void testConcurrentRecycleAndRelease2() throws Exception {
+               // Setup
+               final int retries = 1_000;
+               final int numExclusiveBuffers = 2;
+               final int numFloatingBuffers = 2;
+               final int numTotalBuffers = numExclusiveBuffers + 
numFloatingBuffers;
+               final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(
+                       numTotalBuffers, 32);
+
+               final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+               final SingleInputGate inputGate = createSingleInputGate();
+               final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
+               
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
+               try {
+                       final BufferPool bufferPool = 
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+                       inputGate.setBufferPool(bufferPool);
+                       inputGate.assignExclusiveSegments(networkBufferPool, 
numExclusiveBuffers);
+                       inputChannel.requestSubpartition(0);
+
+                       final Callable<Void> bufferPoolInteractionsTask = () -> 
{
+                               for (int i = 0; i < retries; ++i) {
+                                       Buffer buffer = 
bufferPool.requestBufferBlocking();
+                                       buffer.recycleBuffer();
+                               }
+                               return null;
+                       };
+
+                       final Callable<Void> channelInteractionsTask = () -> {
+                               ArrayList<Buffer> exclusiveBuffers = new 
ArrayList<>(numExclusiveBuffers);
+                               ArrayList<Buffer> floatingBuffers = new 
ArrayList<>(numExclusiveBuffers);
+                               try {
+                                       for (int i = 0; i < retries; ++i) {
+                                               // note: we may still have a 
listener on the buffer pool and receive
+                                               // floating buffers as soon as 
we take exclusive ones
+                                               for (int j = 0; j < 
numTotalBuffers; ++j) {
+                                                       Buffer buffer = 
inputChannel.requestBuffer();
+                                                       if (buffer == null) {
+                                                               break;
+                                                       } else {
+                                                               //noinspection 
ObjectEquality
+                                                               if 
(buffer.getRecycler() == inputChannel) {
+                                                                       
exclusiveBuffers.add(buffer);
+                                                               } else {
+                                                                       
floatingBuffers.add(buffer);
+                                                               }
+                                                       }
+                                               }
+                                               // recycle excess floating 
buffers (will go back into the channel)
+                                               
floatingBuffers.forEach(Buffer::recycleBuffer);
+                                               floatingBuffers.clear();
+
+                                               
assertEquals(numExclusiveBuffers, exclusiveBuffers.size());
+                                               
inputChannel.onSenderBacklog(0); // trigger subscription to buffer pool
+                                               // note: if we got a floating 
buffer by increasing the backlog, it will be released again when recycling the 
exclusive buffer, if not, we should release it once we get it
+                                               
exclusiveBuffers.forEach(Buffer::recycleBuffer);
+                                               exclusiveBuffers.clear();
+                                       }
+                               } finally {
+                                       inputChannel.releaseAllResources();
+                               }
+
+                               return null;
+                       };
+
+                       // Submit tasks and wait to finish
+                       submitTasksAndWaitForResults(executor,
+                               new Callable[] {bufferPoolInteractionsTask, 
channelInteractionsTask});
+               } catch (Throwable t) {
+                       inputChannel.releaseAllResources();
+
+                       try {
+                               networkBufferPool.destroyAllBufferPools();
+                       } catch (Throwable tInner) {
+                               t.addSuppressed(tInner);
+                       }
+
+                       networkBufferPool.destroy();
+                       executor.shutdown();
+                       ExceptionUtils.rethrowException(t);
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        private SingleInputGate createSingleInputGate() {
@@ -986,7 +1080,8 @@ public class RemoteInputChannelTest {
        private void submitTasksAndWaitForResults(ExecutorService executor, 
Callable[] tasks) throws Exception {
                final List<Future> results = 
Lists.newArrayListWithCapacity(tasks.length);
 
-               for(Callable task : tasks) {
+               for (Callable task : tasks) {
+                       //noinspection unchecked
                        results.add(executor.submit(task));
                }
 

Reply via email to