Repository: flink Updated Branches: refs/heads/release-1.1 0dc82baa0 -> 388acbca9
[FLINK-5228] [network] Fix LocalInputChannel re-trigger request and release deadlock Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388acbca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388acbca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388acbca Branch: refs/heads/release-1.1 Commit: 388acbca98200f13b0c016f9b79d900e355dc5ef Parents: 0dc82ba Author: Ufuk Celebi <[email protected]> Authored: Fri Dec 2 12:52:58 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Dec 2 15:17:55 2016 +0100 ---------------------------------------------------------------------- .../partition/consumer/LocalInputChannel.java | 38 ++++---- .../consumer/LocalInputChannelTest.java | 92 +++++++++++++++++++- 2 files changed, 114 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 0a02ea1..51748b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -49,7 +49,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit // ------------------------------------------------------------------------ - private final Object requestReleaseLock = new Object(); + private final Object requestLock = new Object(); /** The local partition manager. */ private final ResultPartitionManager partitionManager; @@ -99,9 +99,12 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit @Override void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { + + boolean retriggerRequest = false; + // The lock is required to request only once in the presence of retriggered requests. - synchronized (requestReleaseLock) { - checkState(!isReleased, "released"); + synchronized (requestLock) { + checkState(!isReleased, "LocalInputChannel has been released already"); if (subpartitionView == null) { LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.", @@ -125,20 +128,27 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } } catch (PartitionNotFoundException notFound) { if (increaseBackoff()) { - inputGate.retriggerPartitionRequest(partitionId.getPartitionId()); + retriggerRequest = true; } else { throw notFound; } } } } + + // Do this outside of the lock scope as this might lead to a + // deadlock with a concurrent release of the channel via the + // input gate. + if (retriggerRequest) { + inputGate.retriggerPartitionRequest(partitionId.getPartitionId()); + } } /** * Retriggers a subpartition request. */ void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) { - synchronized (requestReleaseLock) { + synchronized (requestLock) { checkState(subpartitionView == null, "already requested partition"); timer.schedule(new TimerTask() { @@ -193,7 +203,7 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit // synchronizing on the request lock means this blocks until the asynchronous request // for the partition view has been completed // by then the subpartition view is visible or the channel is released - synchronized (requestReleaseLock) { + synchronized (requestLock) { checkState(!isReleased, "released"); checkState(subpartitionView != null, "Queried for a buffer before requesting the subpartition."); return subpartitionView; @@ -231,19 +241,17 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit } /** - * Releases the look ahead {@link Buffer} instance and discards the queue - * iterator. + * Releases the partition reader */ @Override void releaseAllResources() throws IOException { - synchronized (requestReleaseLock) { - if (!isReleased) { - isReleased = true; + if (!isReleased) { + isReleased = true; - if (subpartitionView != null) { - subpartitionView.releaseAllResources(); - subpartitionView = null; - } + ResultSubpartitionView view = subpartitionView; + if (view != null) { + view.releaseAllResources(); + subpartitionView = null; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 9b36ea9..974ac9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; -import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -254,6 +254,96 @@ public class LocalInputChannelTest { ch.getNextBuffer(); } + /** + * Verifies that concurrent release via the SingleInputGate and re-triggering + * of a partition request works smoothly. + * + * - SingleInputGate acquires its request lock and tries to release all + * registered channels. When releasing a channel, it needs to acquire + * the channel's shared request-release lock. + * - If a LocalInputChannel concurrently retriggers a partition request via + * a Timer Thread it acquires the channel's request-release lock and calls + * the retrigger callback on the SingleInputGate, which again tries to + * acquire the gate's request lock. + * + * For certain timings this obviously leads to a deadlock. This test reliably + * reproduced such a timing (reported in FLINK-5228). This test is pretty much + * testing the buggy implementation and has not much more general value. If it + * becomes obsolete at some point (future greatness ;)), feel free to remove it. + * + * The fix in the end was to to not acquire the channels lock when releasing it + * and/or not doing any input gate callbacks while holding the channel's lock. + * I decided to do both. + */ + @Test + public void testConcurrentReleaseAndRetriggerPartitionRequest() throws Exception { + final SingleInputGate gate = new SingleInputGate( + "test task name", + new JobID(), + new ExecutionAttemptID(), + new IntermediateDataSetID(), + 0, + 1, + mock(PartitionStateChecker.class), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup() + ); + + ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); + when(partitionManager + .createSubpartitionView( + any(ResultPartitionID.class), + anyInt(), + any(BufferProvider.class), + any(BufferAvailabilityListener.class))) + .thenAnswer(new Answer<ResultSubpartitionView>() { + @Override + public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable { + // Sleep here a little to give the releaser Thread + // time to acquire the input gate lock. We throw + // the Exception to retrigger the request. + Thread.sleep(100); + throw new PartitionNotFoundException(new ResultPartitionID()); + } + }); + + final LocalInputChannel channel = new LocalInputChannel( + gate, + 0, + new ResultPartitionID(), + partitionManager, + new TaskEventDispatcher(), + new Tuple2<>(1, 1), + new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()); + + gate.setInputChannel(new IntermediateResultPartitionID(), channel); + + Thread releaser = new Thread() { + @Override + public void run() { + try { + gate.releaseAllResources(); + } catch (IOException ignored) { + } + } + }; + + Thread requester = new Thread() { + @Override + public void run() { + try { + channel.requestSubpartition(0); + } catch (IOException | InterruptedException ignored) { + } + } + }; + + requester.start(); + releaser.start(); + + releaser.join(); + requester.join(); + } + // --------------------------------------------------------------------------------------------- private LocalInputChannel createLocalInputChannel(
