This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4cf197ede67dee9c4fbb41a4c5a8a61b40ddfa5d Author: Nico Kruber <n...@data-artisans.com> AuthorDate: Sun Aug 5 00:41:32 2018 +0200 [FLINK-10142][network] reduce locking around credit notification This closes #6555. --- .../io/network/netty/PartitionRequestClient.java | 5 +- .../partition/consumer/RemoteInputChannel.java | 12 +--- .../partition/consumer/RemoteInputChannelTest.java | 66 ++++++++++++++-------- 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java index 27d341a..9c9deaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java @@ -171,10 +171,7 @@ public class PartitionRequestClient { } public void notifyCreditAvailable(RemoteInputChannel inputChannel) { - // We should skip the notification if the client is already closed. - if (!closeReferenceCounter.isDisposed()) { - clientHandler.notifyCreditAvailable(inputChannel); - } + clientHandler.notifyCreditAvailable(inputChannel); } public void close(RemoteInputChannel inputChannel) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index c4954c0..6738abd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -289,10 +289,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, private void notifyCreditAvailable() { checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); - // We should skip the notification if this channel is already released. - if (!isReleased.get()) { - partitionRequestClient.notifyCreditAvailable(this); - } + partitionRequestClient.notifyCreditAvailable(this); } /** @@ -354,13 +351,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, */ @Override public boolean notifyBufferAvailable(Buffer buffer) { - // Check the isReleased state outside synchronized block first to avoid - // deadlock with releaseAllResources running in parallel. - if (isReleased.get()) { - buffer.recycleBuffer(); - return false; - } - boolean recycleBuffer = true; try { boolean needMoreBuffers = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 6305492..9141b36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -106,10 +106,33 @@ public class RemoteInputChannelTest { @Test public void testConcurrentOnBufferAndRelease() throws Exception { - // Config - // Repeatedly spawn two tasks: one to queue buffers and the other to release the channel - // concurrently. We do this repeatedly to provoke races. - final int numberOfRepetitions = 8192; + testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, j) -> { + inputChannel.onBuffer(buffer, j, -1); + return null; + }); + } + + @Test + public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception { + testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, j) -> + inputChannel.notifyBufferAvailable(buffer) + ); + } + + private interface TriFunction<T, U, V, R> { + R apply(T t, U u, V v) throws Exception; + } + + /** + * Repeatedly spawns two tasks: one to call <tt>function</tt> and the other to release the + * channel concurrently. We do this repeatedly to provoke races. + * + * @param numberOfRepetitions how often to repeat the test + * @param function function to call concurrently to {@link RemoteInputChannel#releaseAllResources()} + */ + private void testConcurrentReleaseAndSomething( + final int numberOfRepetitions, + TriFunction<RemoteInputChannel, Buffer, Integer, Object> function) throws Exception { // Setup final ExecutorService executor = Executors.newFixedThreadPool(2); @@ -122,30 +145,23 @@ public class RemoteInputChannelTest { for (int i = 0; i < numberOfRepetitions; i++) { final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); - final Callable<Void> enqueueTask = new Callable<Void>() { - @Override - public Void call() throws Exception { - while (true) { - for (int j = 0; j < 128; j++) { - // this is the same buffer over and over again which will be - // recycled by the RemoteInputChannel - inputChannel.onBuffer(buffer.retainBuffer(), j, -1); - } + final Callable<Void> enqueueTask = () -> { + while (true) { + for (int j = 0; j < 128; j++) { + // this is the same buffer over and over again which will be + // recycled by the RemoteInputChannel + function.apply(inputChannel, buffer.retainBuffer(), j); + } - if (inputChannel.isReleased()) { - return null; - } + if (inputChannel.isReleased()) { + return null; } } }; - final Callable<Void> releaseTask = new Callable<Void>() { - @Override - public Void call() throws Exception { - inputChannel.releaseAllResources(); - - return null; - } + final Callable<Void> releaseTask = () -> { + inputChannel.releaseAllResources(); + return null; }; // Submit tasks and wait to finish @@ -158,8 +174,8 @@ public class RemoteInputChannelTest { result.get(); } - assertEquals("Resource leak during concurrent release and enqueue.", - 0, inputChannel.getNumberOfQueuedBuffers()); + assertEquals("Resource leak during concurrent release and notifyBufferAvailable.", + 0, inputChannel.getNumberOfQueuedBuffers()); } } finally {