This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3dadd6fe3fa3e163ec9e25bff52c642e91f683f
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Aug 21 17:34:49 2018 +0200

    [FLINK-10141][network] move notifications outside the bufferQueue and 
receivedBuffers locks
    
    This means, notifyCreditAvailable() and notifyChannelNonEmpty() are called
    without any locks being acquired. The change is safe since these callbacks 
do
    their own synchronization already (if needed).
    
    This closes #6553.
---
 .../partition/consumer/RemoteInputChannel.java     | 62 +++++++++++++---------
 1 file changed, 37 insertions(+), 25 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 79d25c6..c4954c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -306,8 +306,8 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                int numAddedBuffers;
 
                synchronized (bufferQueue) {
-                       // Important: check the isReleased state inside 
synchronized block, so there is no
-                       // race condition when recycle and releaseAllResources 
running in parallel.
+                       // Similar to notifyBufferAvailable(), make sure that 
we never add a buffer
+                       // after releaseAllResources() released all buffers 
(see below for details).
                        if (isReleased.get()) {
                                try {
                                        
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
@@ -368,8 +368,13 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                                checkState(isWaitingForFloatingBuffers,
                                        "This channel should be waiting for 
floating buffers.");
 
-                               // Important: double check the isReleased state 
inside synchronized block, so there is no
-                               // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
+                               // Important: make sure that we never add a 
buffer after releaseAllResources()
+                               // released all buffers. Following scenarios 
exist:
+                               // 1) releaseAllResources() already released 
buffers inside bufferQueue
+                               // -> then isReleased is set correctly
+                               // 2) releaseAllResources() did not yet release 
buffers from bufferQueue
+                               // -> we may or may not have set isReleased yet 
but will always wait for the
+                               //    lock on bufferQueue to release buffers
                                if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                                        isWaitingForFloatingBuffers = false;
                                        recycleBuffer = false; // just in case
@@ -385,10 +390,10 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                                } else {
                                        needMoreBuffers = true;
                                }
+                       }
 
-                               if (unannouncedCredit.getAndAdd(1) == 0) {
-                                       notifyCreditAvailable();
-                               }
+                       if (unannouncedCredit.getAndAdd(1) == 0) {
+                               notifyCreditAvailable();
                        }
 
                        return needMoreBuffers;
@@ -484,8 +489,8 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                int numRequestedBuffers = 0;
 
                synchronized (bufferQueue) {
-                       // Important: check the isReleased state inside 
synchronized block, so there is no
-                       // race condition when onSenderBacklog and 
releaseAllResources running in parallel.
+                       // Similar to notifyBufferAvailable(), make sure that 
we never add a buffer
+                       // after releaseAllResources() released all buffers 
(see above for details).
                        if (isReleased.get()) {
                                return;
                        }
@@ -510,33 +515,40 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
        }
 
        public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) 
throws IOException {
-               boolean success = false;
+               boolean recycleBuffer = true;
 
                try {
+
+                       final boolean wasEmpty;
                        synchronized (receivedBuffers) {
-                               if (!isReleased.get()) {
-                                       if (expectedSequenceNumber == 
sequenceNumber) {
-                                               final boolean wasEmpty = 
receivedBuffers.isEmpty();
+                               // Similar to notifyBufferAvailable(), make 
sure that we never add a buffer
+                               // after releaseAllResources() released all 
buffers from receivedBuffers
+                               // (see above for details).
+                               if (isReleased.get()) {
+                                       return;
+                               }
 
-                                               receivedBuffers.add(buffer);
-                                               expectedSequenceNumber++;
+                               if (expectedSequenceNumber != sequenceNumber) {
+                                       onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+                                       return;
+                               }
 
-                                               if (wasEmpty) {
-                                                       notifyChannelNonEmpty();
-                                               }
+                               wasEmpty = receivedBuffers.isEmpty();
+                               receivedBuffers.add(buffer);
+                               recycleBuffer = false;
+                       }
 
-                                               success = true;
-                                       } else {
-                                               onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-                                       }
-                               }
+                       ++expectedSequenceNumber;
+
+                       if (wasEmpty) {
+                               notifyChannelNonEmpty();
                        }
 
-                       if (success && backlog >= 0) {
+                       if (backlog >= 0) {
                                onSenderBacklog(backlog);
                        }
                } finally {
-                       if (!success) {
+                       if (recycleBuffer) {
                                buffer.recycleBuffer();
                        }
                }

Reply via email to