This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4cf197ede67dee9c4fbb41a4c5a8a61b40ddfa5d
Author: Nico Kruber <n...@data-artisans.com>
AuthorDate: Sun Aug 5 00:41:32 2018 +0200

    [FLINK-10142][network] reduce locking around credit notification
    
    This closes #6555.
---
 .../io/network/netty/PartitionRequestClient.java   |  5 +-
 .../partition/consumer/RemoteInputChannel.java     | 12 +---
 .../partition/consumer/RemoteInputChannelTest.java | 66 ++++++++++++++--------
 3 files changed, 43 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index 27d341a..9c9deaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -171,10 +171,7 @@ public class PartitionRequestClient {
        }
 
        public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-               // We should skip the notification if the client is already 
closed.
-               if (!closeReferenceCounter.isDisposed()) {
-                       clientHandler.notifyCreditAvailable(inputChannel);
-               }
+               clientHandler.notifyCreditAvailable(inputChannel);
        }
 
        public void close(RemoteInputChannel inputChannel) throws IOException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index c4954c0..6738abd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -289,10 +289,7 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
        private void notifyCreditAvailable() {
                checkState(partitionRequestClient != null, "Tried to send task 
event to producer before requesting a queue.");
 
-               // We should skip the notification if this channel is already 
released.
-               if (!isReleased.get()) {
-                       partitionRequestClient.notifyCreditAvailable(this);
-               }
+               partitionRequestClient.notifyCreditAvailable(this);
        }
 
        /**
@@ -354,13 +351,6 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
         */
        @Override
        public boolean notifyBufferAvailable(Buffer buffer) {
-               // Check the isReleased state outside synchronized block first 
to avoid
-               // deadlock with releaseAllResources running in parallel.
-               if (isReleased.get()) {
-                       buffer.recycleBuffer();
-                       return false;
-               }
-
                boolean recycleBuffer = true;
                try {
                        boolean needMoreBuffers = false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 6305492..9141b36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -106,10 +106,33 @@ public class RemoteInputChannelTest {
 
        @Test
        public void testConcurrentOnBufferAndRelease() throws Exception {
-               // Config
-               // Repeatedly spawn two tasks: one to queue buffers and the 
other to release the channel
-               // concurrently. We do this repeatedly to provoke races.
-               final int numberOfRepetitions = 8192;
+               testConcurrentReleaseAndSomething(8192, (inputChannel, buffer, 
j) -> {
+                       inputChannel.onBuffer(buffer, j, -1);
+                       return null;
+               });
+       }
+
+       @Test
+       public void testConcurrentNotifyBufferAvailableAndRelease() throws 
Exception {
+               testConcurrentReleaseAndSomething(1024, (inputChannel, buffer, 
j) ->
+                       inputChannel.notifyBufferAvailable(buffer)
+               );
+       }
+
+       private interface TriFunction<T, U, V, R> {
+               R apply(T t, U u, V v) throws Exception;
+       }
+
+       /**
+        * Repeatedly spawns two tasks: one to call <tt>function</tt> and the 
other to release the
+        * channel concurrently. We do this repeatedly to provoke races.
+        *
+        * @param numberOfRepetitions how often to repeat the test
+        * @param function function to call concurrently to {@link 
RemoteInputChannel#releaseAllResources()}
+        */
+       private void testConcurrentReleaseAndSomething(
+                       final int numberOfRepetitions,
+                       TriFunction<RemoteInputChannel, Buffer, Integer, 
Object> function) throws Exception {
 
                // Setup
                final ExecutorService executor = 
Executors.newFixedThreadPool(2);
@@ -122,30 +145,23 @@ public class RemoteInputChannelTest {
                        for (int i = 0; i < numberOfRepetitions; i++) {
                                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
 
-                               final Callable<Void> enqueueTask = new 
Callable<Void>() {
-                                       @Override
-                                       public Void call() throws Exception {
-                                               while (true) {
-                                                       for (int j = 0; j < 
128; j++) {
-                                                               // this is the 
same buffer over and over again which will be
-                                                               // recycled by 
the RemoteInputChannel
-                                                               
inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
-                                                       }
+                               final Callable<Void> enqueueTask = () -> {
+                                       while (true) {
+                                               for (int j = 0; j < 128; j++) {
+                                                       // this is the same 
buffer over and over again which will be
+                                                       // recycled by the 
RemoteInputChannel
+                                                       
function.apply(inputChannel, buffer.retainBuffer(), j);
+                                               }
 
-                                                       if 
(inputChannel.isReleased()) {
-                                                               return null;
-                                                       }
+                                               if (inputChannel.isReleased()) {
+                                                       return null;
                                                }
                                        }
                                };
 
-                               final Callable<Void> releaseTask = new 
Callable<Void>() {
-                                       @Override
-                                       public Void call() throws Exception {
-                                               
inputChannel.releaseAllResources();
-
-                                               return null;
-                                       }
+                               final Callable<Void> releaseTask = () -> {
+                                       inputChannel.releaseAllResources();
+                                       return null;
                                };
 
                                // Submit tasks and wait to finish
@@ -158,8 +174,8 @@ public class RemoteInputChannelTest {
                                        result.get();
                                }
 
-                               assertEquals("Resource leak during concurrent 
release and enqueue.",
-                                               0, 
inputChannel.getNumberOfQueuedBuffers());
+                               assertEquals("Resource leak during concurrent 
release and notifyBufferAvailable.",
+                                       0, 
inputChannel.getNumberOfQueuedBuffers());
                        }
                }
                finally {

Reply via email to