Repository: flink
Updated Branches:
  refs/heads/release-1.1 0dc82baa0 -> 388acbca9


[FLINK-5228] [network] Fix LocalInputChannel re-trigger request and release 
deadlock


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388acbca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388acbca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388acbca

Branch: refs/heads/release-1.1
Commit: 388acbca98200f13b0c016f9b79d900e355dc5ef
Parents: 0dc82ba
Author: Ufuk Celebi <[email protected]>
Authored: Fri Dec 2 12:52:58 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Dec 2 15:17:55 2016 +0100

----------------------------------------------------------------------
 .../partition/consumer/LocalInputChannel.java   | 38 ++++----
 .../consumer/LocalInputChannelTest.java         | 92 +++++++++++++++++++-
 2 files changed, 114 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 0a02ea1..51748b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -49,7 +49,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
        // 
------------------------------------------------------------------------
 
-       private final Object requestReleaseLock = new Object();
+       private final Object requestLock = new Object();
 
        /** The local partition manager. */
        private final ResultPartitionManager partitionManager;
@@ -99,9 +99,12 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
        @Override
        void requestSubpartition(int subpartitionIndex) throws IOException, 
InterruptedException {
+
+               boolean retriggerRequest = false;
+
                // The lock is required to request only once in the presence of 
retriggered requests.
-               synchronized (requestReleaseLock) {
-                       checkState(!isReleased, "released");
+               synchronized (requestLock) {
+                       checkState(!isReleased, "LocalInputChannel has been 
released already");
 
                        if (subpartitionView == null) {
                                LOG.debug("{}: Requesting LOCAL subpartition {} 
of partition {}.",
@@ -125,20 +128,27 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                                        }
                                } catch (PartitionNotFoundException notFound) {
                                        if (increaseBackoff()) {
-                                               
inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+                                               retriggerRequest = true;
                                        } else {
                                                throw notFound;
                                        }
                                }
                        }
                }
+
+               // Do this outside of the lock scope as this might lead to a
+               // deadlock with a concurrent release of the channel via the
+               // input gate.
+               if (retriggerRequest) {
+                       
inputGate.retriggerPartitionRequest(partitionId.getPartitionId());
+               }
        }
 
        /**
         * Retriggers a subpartition request.
         */
        void retriggerSubpartitionRequest(Timer timer, final int 
subpartitionIndex) {
-               synchronized (requestReleaseLock) {
+               synchronized (requestLock) {
                        checkState(subpartitionView == null, "already requested 
partition");
 
                        timer.schedule(new TimerTask() {
@@ -193,7 +203,7 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
                // synchronizing on the request lock means this blocks until 
the asynchronous request
                // for the partition view has been completed
                // by then the subpartition view is visible or the channel is 
released
-               synchronized (requestReleaseLock) {
+               synchronized (requestLock) {
                        checkState(!isReleased, "released");
                        checkState(subpartitionView != null, "Queried for a 
buffer before requesting the subpartition.");
                        return subpartitionView;
@@ -231,19 +241,17 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
        }
 
        /**
-        * Releases the look ahead {@link Buffer} instance and discards the 
queue
-        * iterator.
+        * Releases the partition reader
         */
        @Override
        void releaseAllResources() throws IOException {
-               synchronized (requestReleaseLock) {
-                       if (!isReleased) {
-                               isReleased = true;
+               if (!isReleased) {
+                       isReleased = true;
 
-                               if (subpartitionView != null) {
-                                       subpartitionView.releaseAllResources();
-                                       subpartitionView = null;
-                               }
+                       ResultSubpartitionView view = subpartitionView;
+                       if (view != null) {
+                               view.releaseAllResources();
+                               subpartitionView = null;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/388acbca/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 9b36ea9..974ac9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -254,6 +254,96 @@ public class LocalInputChannelTest {
                ch.getNextBuffer();
        }
 
+       /**
+        * Verifies that concurrent release via the SingleInputGate and 
re-triggering
+        * of a partition request works smoothly.
+        *
+        * - SingleInputGate acquires its request lock and tries to release all
+        * registered channels. When releasing a channel, it needs to acquire
+        * the channel's shared request-release lock.
+        * - If a LocalInputChannel concurrently retriggers a partition request 
via
+        * a Timer Thread it acquires the channel's request-release lock and 
calls
+        * the retrigger callback on the SingleInputGate, which again tries to
+        * acquire the gate's request lock.
+        *
+        * For certain timings this obviously leads to a deadlock. This test 
reliably
+        * reproduced such a timing (reported in FLINK-5228). This test is 
pretty much
+        * testing the buggy implementation and has not much more general 
value. If it
+        * becomes obsolete at some point (future greatness ;)), feel free to 
remove it.
+        *
+        * The fix in the end was to to not acquire the channels lock when 
releasing it
+        * and/or not doing any input gate callbacks while holding the 
channel's lock.
+        * I decided to do both.
+        */
+       @Test
+       public void testConcurrentReleaseAndRetriggerPartitionRequest() throws 
Exception {
+               final SingleInputGate gate = new SingleInputGate(
+                       "test task name",
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       new IntermediateDataSetID(),
+                       0,
+                       1,
+                       mock(PartitionStateChecker.class),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
+               );
+
+               ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
+               when(partitionManager
+                       .createSubpartitionView(
+                               any(ResultPartitionID.class),
+                               anyInt(),
+                               any(BufferProvider.class),
+                               any(BufferAvailabilityListener.class)))
+                       .thenAnswer(new Answer<ResultSubpartitionView>() {
+                               @Override
+                               public ResultSubpartitionView 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                       // Sleep here a little to give the 
releaser Thread
+                                       // time to acquire the input gate lock. 
We throw
+                                       // the Exception to retrigger the 
request.
+                                       Thread.sleep(100);
+                                       throw new 
PartitionNotFoundException(new ResultPartitionID());
+                               }
+                       });
+
+               final LocalInputChannel channel = new LocalInputChannel(
+                       gate,
+                       0,
+                       new ResultPartitionID(),
+                       partitionManager,
+                       new TaskEventDispatcher(),
+                       new Tuple2<>(1, 1),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+               gate.setInputChannel(new IntermediateResultPartitionID(), 
channel);
+
+               Thread releaser = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       gate.releaseAllResources();
+                               } catch (IOException ignored) {
+                               }
+                       }
+               };
+
+               Thread requester = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       channel.requestSubpartition(0);
+                               } catch (IOException | InterruptedException 
ignored) {
+                               }
+                       }
+               };
+
+               requester.start();
+               releaser.start();
+
+               releaser.join();
+               requester.join();
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        private LocalInputChannel createLocalInputChannel(

Reply via email to