This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 16ea186fef3ac7eee8679f8e87132313508573b3 Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 11:13:55 2018 +0200 [FLINK-10331][network] reduce unnecessary flushing Do not flush (again) if - a previous flush request has not been completely handled yet and/or is still enqueued or - the network stack is still polling from this subpartition and doesn't need a new notification This closes #6692. --- .../network/partition/PipelinedSubpartition.java | 46 ++++++++++++++++------ .../partition/PipelinedSubpartitionTest.java | 45 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 91e0d4b..d2d7fdb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -36,6 +36,19 @@ import static org.apache.flink.util.Preconditions.checkState; /** * A pipelined in-memory only subpartition, which can be consumed once. + * + * <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second + * {@link BufferConsumer} (in which case we will assume the first one finished), we will + * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via + * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling + * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and + * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows + * no more buffers being available. This results in a buffer queue which is either empty or has an + * unfinished {@link BufferConsumer} left from which the notifications will eventually start again. + * + * <p>Explicit calls to {@link #flush()} will force this + * {@link PipelinedSubpartitionView#notifyDataAvailable() notification} for any + * {@link BufferConsumer} present in the queue. */ class PipelinedSubpartition extends ResultSubpartition { @@ -67,17 +80,6 @@ class PipelinedSubpartition extends ResultSubpartition { } @Override - public void flush() { - synchronized (buffers) { - if (buffers.isEmpty()) { - return; - } - flushRequested = !buffers.isEmpty(); - notifyDataAvailable(); - } - } - - @Override public void finish() throws IOException { add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this); @@ -99,7 +101,7 @@ class PipelinedSubpartition extends ResultSubpartition { if (finish) { isFinished = true; - flush(); + notifyDataAvailable(); } else { maybeNotifyDataAvailable(); @@ -279,6 +281,23 @@ class PipelinedSubpartition extends ResultSubpartition { return Math.max(buffers.size(), 0); } + @Override + public void flush() { + synchronized (buffers) { + if (buffers.isEmpty()) { + return; + } + if (!flushRequested) { + flushRequested = true; // set this before the notification! + // if there is more then 1 buffer, we already notified the reader + // (at the latest when adding the second buffer) + if (buffers.size() == 1) { + notifyDataAvailable(); + } + } + } + } + private void maybeNotifyDataAvailable() { // Notify only when we added first finished buffer. if (getNumberOfFinishedBuffers() == 1) { @@ -295,6 +314,9 @@ class PipelinedSubpartition extends ResultSubpartition { private int getNumberOfFinishedBuffers() { assert Thread.holdsLock(buffers); + // NOTE: isFinished() is not guaranteed to provide the most up-to-date state here + // worst-case: a single finished buffer sits around until the next flush() call + // (but we do not offer stronger guarantees anyway) if (buffers.size() == 1 && buffers.peekLast().isFinished()) { return 1; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 90bdb82..b75bb7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -47,6 +47,8 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils. import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE; import static org.apache.flink.util.FutureUtil.waitForAll; import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -160,7 +162,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + assertThat(availablityListener.getNumNotifications(), greaterThan(0L)); assertNextBuffer(readView, 1025, false, 1, false, true); + // not notified, but we could still access the unfinished buffer + assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); } finally { subpartition.release(); } @@ -179,10 +185,49 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { try { subpartition.add(createFilledBufferConsumer(1025)); // finished subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + long oldNumNotifications = availablityListener.getNumNotifications(); subpartition.flush(); + // buffer queue is > 1, should already be notified, no further notification necessary + assertThat(oldNumNotifications, greaterThan(0L)); + assertEquals(oldNumNotifications, availablityListener.getNumNotifications()); assertNextBuffer(readView, 1025, true, 1, false, true); assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); + } finally { + subpartition.release(); + } + } + + /** + * A flush call with a buffer size of 1 should always notify consumers (unless already flushed). + */ + @Test + public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception { + final ResultSubpartition subpartition = createSubpartition(); + AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener(); + ResultSubpartitionView readView = subpartition.createReadView(availablityListener); + + try { + // no buffers -> no notification or any other effects + subpartition.flush(); + assertEquals(0, availablityListener.getNumNotifications()); + + subpartition.add(createFilledBufferConsumer(1025)); // finished + subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished + + assertNextBuffer(readView, 1025, false, 1, false, true); + + long oldNumNotifications = availablityListener.getNumNotifications(); + subpartition.flush(); + // buffer queue is 1 again -> need to flush + assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); + subpartition.flush(); + // calling again should not flush again + assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications()); + + assertNextBuffer(readView, 1024, false, 1, false, false); + assertNoNextBuffer(readView); } finally { subpartition.release(); }
