Repository: flink
Updated Branches:
refs/heads/master 59e55cab3 -> a9b5579b3
[FLINK-9676][network] clarify contracts of
BufferListener#notifyBufferAvailable() and fix a deadlock
When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue but
acquiring them in reverse order.
One such instance would be:
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle
floating buffers
-> lock(LocalBufferPool#availableMemorySegments) ->
RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)
Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) ->
bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)
Therefore, we decouple the listener callback from lock around
LocalBufferPool#availableMemorySegments and implicitly enforce that
RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
be acquired first and should never be taken after having locked on
LocalBufferPool#availableMemorySegments.
This closes #6257.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9b5579b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9b5579b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9b5579b
Branch: refs/heads/master
Commit: a9b5579b397b6c56e44e21ebb8b2a6a7e6d8b2d1
Parents: 59e55ca
Author: Nico Kruber <[email protected]>
Authored: Wed Jul 4 17:45:18 2018 +0200
Committer: Nico Kruber <[email protected]>
Committed: Thu Jul 5 12:18:37 2018 +0200
----------------------------------------------------------------------
.../io/network/buffer/LocalBufferPool.java | 56 +++++++++--
.../io/network/buffer/LocalBufferPoolTest.java | 7 +-
.../consumer/RemoteInputChannelTest.java | 99 +++++++++++++++++++-
3 files changed, 147 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a9b5579b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 6f0d991..e874723 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -57,6 +57,12 @@ class LocalBufferPool implements BufferPool {
/**
* The currently available memory segments. These are segments, which
have been requested from
* the network buffer pool and are currently not handed out as Buffer
instances.
+ *
+ * <p><strong>BEWARE:</strong> Take special care with the interactions
between this lock and
+ * locks acquired before entering this class vs. locks being acquired
during calls to external
+ * code inside this class, e.g. with
+ * {@link
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel#bufferQueue}
+ * via the {@link #registeredListeners} callback.
*/
private final ArrayDeque<MemorySegment> availableMemorySegments = new
ArrayDeque<MemorySegment>();
@@ -251,27 +257,56 @@ class LocalBufferPool implements BufferPool {
@Override
public void recycle(MemorySegment segment) {
+ BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments >
currentPoolSize) {
returnMemorySegment(segment);
+ return;
}
else {
- BufferListener listener =
registeredListeners.poll();
+ listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+ return;
}
- else {
- try {
- boolean needMoreBuffers =
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
- if (needMoreBuffers) {
-
registeredListeners.add(listener);
- }
+ }
+ }
+
+ // We do not know which locks have been acquired before the
recycle() or are needed in the
+ // notification and which other threads also access them.
+ // -> call notifyBufferAvailable() outside of the synchronized
block to avoid a deadlock (FLINK-9676)
+ boolean success = false;
+ boolean needMoreBuffers = false;
+ try {
+ needMoreBuffers = listener.notifyBufferAvailable(new
NetworkBuffer(segment, this));
+ success = true;
+ } catch (Throwable ignored) {
+ // handled below, under the lock
+ }
+
+ if (!success || needMoreBuffers) {
+ synchronized (availableMemorySegments) {
+ if (isDestroyed) {
+ // cleanup tasks how they would have
been done if we only had one synchronized block
+ if (needMoreBuffers) {
+
listener.notifyBufferDestroyed();
}
- catch (Throwable ignored) {
-
availableMemorySegments.add(segment);
-
availableMemorySegments.notify();
+ if (!success) {
+ returnMemorySegment(segment);
+ }
+ } else {
+ if (needMoreBuffers) {
+
registeredListeners.add(listener);
+ }
+ if (!success) {
+ if
(numberOfRequestedMemorySegments > currentPoolSize) {
+
returnMemorySegment(segment);
+ } else {
+
availableMemorySegments.add(segment);
+
availableMemorySegments.notify();
+ }
}
}
}
@@ -283,6 +318,7 @@ class LocalBufferPool implements BufferPool {
*/
@Override
public void lazyDestroy() {
+ // NOTE: if you change this logic, be sure to update recycle()
as well!
synchronized (availableMemorySegments) {
if (!isDestroyed) {
MemorySegment segment;
http://git-wip-us.apache.org/repos/asf/flink/blob/a9b5579b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 303a6a9..7a6fe6a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -407,13 +408,13 @@ public class LocalBufferPoolTest extends TestLogger {
private BufferListener createBufferListener(int notificationTimes) {
return spy(new BufferListener() {
- int times = 0;
+ AtomicInteger times = new AtomicInteger(0);
@Override
public boolean notifyBufferAvailable(Buffer buffer) {
- times++;
+ int newCount = times.incrementAndGet();
buffer.recycleBuffer();
- return times < notificationTimes;
+ return newCount < notificationTimes;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a9b5579b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 802cb93..4080106 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -33,6 +33,7 @@ import
org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -52,6 +53,7 @@ import scala.Tuple2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -62,6 +64,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the {@link RemoteInputChannel}.
+ */
public class RemoteInputChannelTest {
@Test
@@ -804,7 +809,7 @@ public class RemoteInputChannelTest {
recycleFloatingBufferTask(bufferPool,
numFloatingBuffers),
requestBufferTask});
- assertEquals("There should be " +
inputChannel.getNumberOfRequiredBuffers() +" buffers available in channel.",
+ assertEquals("There should be " +
inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
inputChannel.getNumberOfRequiredBuffers(),
inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be no buffers available in
local pool.",
0,
bufferPool.getNumberOfAvailableMemorySegments());
@@ -878,6 +883,95 @@ public class RemoteInputChannelTest {
}
}
+ /**
+ * Tests to verify that there is no race condition with two things
running in parallel:
+ * recycling exclusive buffers and recycling external buffers to the
buffer pool while the
+ * recycling of the exclusive buffer triggers recycling a floating
buffer (FLINK-9676).
+ */
+ @Test
+ public void testConcurrentRecycleAndRelease2() throws Exception {
+ // Setup
+ final int retries = 1_000;
+ final int numExclusiveBuffers = 2;
+ final int numFloatingBuffers = 2;
+ final int numTotalBuffers = numExclusiveBuffers +
numFloatingBuffers;
+ final NetworkBufferPool networkBufferPool = new
NetworkBufferPool(
+ numTotalBuffers, 32);
+
+ final ExecutorService executor =
Executors.newFixedThreadPool(2);
+
+ final SingleInputGate inputGate = createSingleInputGate();
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(),
inputChannel);
+ try {
+ final BufferPool bufferPool =
networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
+ inputGate.setBufferPool(bufferPool);
+ inputGate.assignExclusiveSegments(networkBufferPool,
numExclusiveBuffers);
+ inputChannel.requestSubpartition(0);
+
+ final Callable<Void> bufferPoolInteractionsTask = () ->
{
+ for (int i = 0; i < retries; ++i) {
+ Buffer buffer =
bufferPool.requestBufferBlocking();
+ buffer.recycleBuffer();
+ }
+ return null;
+ };
+
+ final Callable<Void> channelInteractionsTask = () -> {
+ ArrayList<Buffer> exclusiveBuffers = new
ArrayList<>(numExclusiveBuffers);
+ ArrayList<Buffer> floatingBuffers = new
ArrayList<>(numExclusiveBuffers);
+ try {
+ for (int i = 0; i < retries; ++i) {
+ // note: we may still have a
listener on the buffer pool and receive
+ // floating buffers as soon as
we take exclusive ones
+ for (int j = 0; j <
numTotalBuffers; ++j) {
+ Buffer buffer =
inputChannel.requestBuffer();
+ if (buffer == null) {
+ break;
+ } else {
+ //noinspection
ObjectEquality
+ if
(buffer.getRecycler() == inputChannel) {
+
exclusiveBuffers.add(buffer);
+ } else {
+
floatingBuffers.add(buffer);
+ }
+ }
+ }
+ // recycle excess floating
buffers (will go back into the channel)
+
floatingBuffers.forEach(Buffer::recycleBuffer);
+ floatingBuffers.clear();
+
+
assertEquals(numExclusiveBuffers, exclusiveBuffers.size());
+
inputChannel.onSenderBacklog(0); // trigger subscription to buffer pool
+ // note: if we got a floating
buffer by increasing the backlog, it will be released again when recycling the
exclusive buffer, if not, we should release it once we get it
+
exclusiveBuffers.forEach(Buffer::recycleBuffer);
+ exclusiveBuffers.clear();
+ }
+ } finally {
+ inputChannel.releaseAllResources();
+ }
+
+ return null;
+ };
+
+ // Submit tasks and wait to finish
+ submitTasksAndWaitForResults(executor,
+ new Callable[] {bufferPoolInteractionsTask,
channelInteractionsTask});
+ } catch (Throwable t) {
+ inputChannel.releaseAllResources();
+
+ try {
+ networkBufferPool.destroyAllBufferPools();
+ } catch (Throwable tInner) {
+ t.addSuppressed(tInner);
+ }
+
+ networkBufferPool.destroy();
+ executor.shutdown();
+ ExceptionUtils.rethrowException(t);
+ }
+ }
+
//
---------------------------------------------------------------------------------------------
private SingleInputGate createSingleInputGate() {
@@ -986,7 +1080,8 @@ public class RemoteInputChannelTest {
private void submitTasksAndWaitForResults(ExecutorService executor,
Callable[] tasks) throws Exception {
final List<Future> results =
Lists.newArrayListWithCapacity(tasks.length);
- for(Callable task : tasks) {
+ for (Callable task : tasks) {
+ //noinspection unchecked
results.add(executor.submit(task));
}