This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b2b0f71cb33fa749b9332ffd386f39c9acf6a0e
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [FLINK-10332][network] move data notification out of the synchronized block
    
    None of the notifications actually rely on being under the lock and may thus
    only cause lock contention.
    
    This closes #6693.
---
 .../network/partition/PipelinedSubpartition.java   | 44 +++++++++++-----------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index d2d7fdb..fe27d97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -88,6 +88,7 @@ class PipelinedSubpartition extends ResultSubpartition {
        private boolean add(BufferConsumer bufferConsumer, boolean finish) {
                checkNotNull(bufferConsumer);
 
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        if (isFinished || isReleased) {
                                bufferConsumer.close();
@@ -98,14 +99,13 @@ class PipelinedSubpartition extends ResultSubpartition {
                        buffers.add(bufferConsumer);
                        updateStatistics(bufferConsumer);
                        increaseBuffersInBacklog(bufferConsumer);
+                       notifyDataAvailable = shouldNotifyDataAvailable() || 
finish;
 
-                       if (finish) {
-                               isFinished = true;
-                               notifyDataAvailable();
-                       }
-                       else {
-                               maybeNotifyDataAvailable();
-                       }
+                       isFinished |= finish;
+               }
+
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
 
                return true;
@@ -220,6 +220,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        @Override
        public PipelinedSubpartitionView 
createReadView(BufferAvailabilityListener availabilityListener) throws 
IOException {
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        checkState(!isReleased);
                        checkState(readView == null,
@@ -230,9 +231,10 @@ class PipelinedSubpartition extends ResultSubpartition {
                                parent.getOwningTaskName(), index, 
parent.getPartitionId());
 
                        readView = new PipelinedSubpartitionView(this, 
availabilityListener);
-                       if (!buffers.isEmpty()) {
-                               notifyDataAvailable();
-                       }
+                       notifyDataAvailable = !buffers.isEmpty();
+               }
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
 
                return readView;
@@ -283,26 +285,24 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        @Override
        public void flush() {
+               final boolean notifyDataAvailable;
                synchronized (buffers) {
                        if (buffers.isEmpty()) {
                                return;
                        }
-                       if (!flushRequested) {
-                               flushRequested = true; // set this before the 
notification!
-                               // if there is more then 1 buffer, we already 
notified the reader
-                               // (at the latest when adding the second buffer)
-                               if (buffers.size() == 1) {
-                                       notifyDataAvailable();
-                               }
-                       }
+                       // if there is more then 1 buffer, we already notified 
the reader
+                       // (at the latest when adding the second buffer)
+                       notifyDataAvailable = !flushRequested && buffers.size() 
== 1;
+                       flushRequested = true;
+               }
+               if (notifyDataAvailable) {
+                       notifyDataAvailable();
                }
        }
 
-       private void maybeNotifyDataAvailable() {
+       private boolean shouldNotifyDataAvailable() {
                // Notify only when we added first finished buffer.
-               if (getNumberOfFinishedBuffers() == 1) {
-                       notifyDataAvailable();
-               }
+               return readView != null && !flushRequested && 
getNumberOfFinishedBuffers() == 1;
        }
 
        private void notifyDataAvailable() {

Reply via email to