This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a9e5f70515a3cbad7e776b9248e841f6c81c9b7a
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 11:13:55 2018 +0200

    [FLINK-10331][network] reduce unnecessary flushing
    
    Do not flush (again) if
    - a previous flush request has not been completely handled yet and/or is 
still enqueued or
    - the network stack is still polling from this subpartition and doesn't 
need a new notification
    
    This closes #6692.
---
 .../network/partition/PipelinedSubpartition.java   | 46 ++++++++++++++++------
 .../partition/PipelinedSubpartitionTest.java       | 45 +++++++++++++++++++++
 2 files changed, 79 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 91e0d4b..d2d7fdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -36,6 +36,19 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A pipelined in-memory only subpartition, which can be consumed once.
+ *
+ * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link 
BufferConsumer} or a second
+ * {@link BufferConsumer} (in which case we will assume the first one 
finished), we will
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view 
created via
+ * {@link #createReadView(BufferAvailabilityListener)} of new data 
availability. Except by calling
+ * {@link #flush()} explicitly, we always only notify when the first finished 
buffer turns up and
+ * then, the reader has to drain the buffers via {@link #pollBuffer()} until 
its return value shows
+ * no more buffers being available. This results in a buffer queue which is 
either empty or has an
+ * unfinished {@link BufferConsumer} left from which the notifications will 
eventually start again.
+ *
+ * <p>Explicit calls to {@link #flush()} will force this
+ * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any
+ * {@link BufferConsumer} present in the queue.
  */
 class PipelinedSubpartition extends ResultSubpartition {
 
@@ -67,17 +80,6 @@ class PipelinedSubpartition extends ResultSubpartition {
        }
 
        @Override
-       public void flush() {
-               synchronized (buffers) {
-                       if (buffers.isEmpty()) {
-                               return;
-                       }
-                       flushRequested = !buffers.isEmpty();
-                       notifyDataAvailable();
-               }
-       }
-
-       @Override
        public void finish() throws IOException {
                
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
                LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
@@ -99,7 +101,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                        if (finish) {
                                isFinished = true;
-                               flush();
+                               notifyDataAvailable();
                        }
                        else {
                                maybeNotifyDataAvailable();
@@ -279,6 +281,23 @@ class PipelinedSubpartition extends ResultSubpartition {
                return Math.max(buffers.size(), 0);
        }
 
+       @Override
+       public void flush() {
+               synchronized (buffers) {
+                       if (buffers.isEmpty()) {
+                               return;
+                       }
+                       if (!flushRequested) {
+                               flushRequested = true; // set this before the 
notification!
+                               // if there is more then 1 buffer, we already 
notified the reader
+                               // (at the latest when adding the second buffer)
+                               if (buffers.size() == 1) {
+                                       notifyDataAvailable();
+                               }
+                       }
+               }
+       }
+
        private void maybeNotifyDataAvailable() {
                // Notify only when we added first finished buffer.
                if (getNumberOfFinishedBuffers() == 1) {
@@ -295,6 +314,9 @@ class PipelinedSubpartition extends ResultSubpartition {
        private int getNumberOfFinishedBuffers() {
                assert Thread.holdsLock(buffers);
 
+               // NOTE: isFinished() is not guaranteed to provide the most 
up-to-date state here
+               // worst-case: a single finished buffer sits around until the 
next flush() call
+               // (but we do not offer stronger guarantees anyway)
                if (buffers.size() == 1 && buffers.peekLast().isFinished()) {
                        return 1;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 90bdb82..b75bb7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -47,6 +47,8 @@ import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static 
org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -160,7 +162,11 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        subpartition.add(createFilledBufferConsumer(1025)); // 
finished
                        
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
 
+                       assertThat(availablityListener.getNumNotifications(), 
greaterThan(0L));
                        assertNextBuffer(readView, 1025, false, 1, false, true);
+                       // not notified, but we could still access the 
unfinished buffer
+                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
+                       assertNoNextBuffer(readView);
                } finally {
                        subpartition.release();
                }
@@ -179,10 +185,49 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                try {
                        subpartition.add(createFilledBufferConsumer(1025)); // 
finished
                        
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+                       long oldNumNotifications = 
availablityListener.getNumNotifications();
                        subpartition.flush();
+                       // buffer queue is > 1, should already be notified, no 
further notification necessary
+                       assertThat(oldNumNotifications, greaterThan(0L));
+                       assertEquals(oldNumNotifications, 
availablityListener.getNumNotifications());
 
                        assertNextBuffer(readView, 1025, true, 1, false, true);
                        assertNextBuffer(readView, 1024, false, 1, false, 
false);
+                       assertNoNextBuffer(readView);
+               } finally {
+                       subpartition.release();
+               }
+       }
+
+       /**
+        * A flush call with a buffer size of 1 should always notify consumers 
(unless already flushed).
+        */
+       @Test
+       public void testFlushWithUnfinishedBufferBehindFinished2() throws 
Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               AwaitableBufferAvailablityListener availablityListener = new 
AwaitableBufferAvailablityListener();
+               ResultSubpartitionView readView = 
subpartition.createReadView(availablityListener);
+
+               try {
+                       // no buffers -> no notification or any other effects
+                       subpartition.flush();
+                       assertEquals(0, 
availablityListener.getNumNotifications());
+
+                       subpartition.add(createFilledBufferConsumer(1025)); // 
finished
+                       
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // 
not finished
+
+                       assertNextBuffer(readView, 1025, false, 1, false, true);
+
+                       long oldNumNotifications = 
availablityListener.getNumNotifications();
+                       subpartition.flush();
+                       // buffer queue is 1 again -> need to flush
+                       assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+                       subpartition.flush();
+                       // calling again should not flush again
+                       assertEquals(oldNumNotifications + 1, 
availablityListener.getNumNotifications());
+
+                       assertNextBuffer(readView, 1024, false, 1, false, 
false);
+                       assertNoNextBuffer(readView);
                } finally {
                        subpartition.release();
                }

Reply via email to