Repository: flink Updated Branches: refs/heads/master dbe707324 -> 2fcef5ecf
[FLINK-5169] [network] Fix spillable subpartition buffer count Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fcef5ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fcef5ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fcef5ec Branch: refs/heads/master Commit: 2fcef5ecf473f82a3894b136e48b53b81b465356 Parents: c0cdc5c Author: Ufuk Celebi <u...@apache.org> Authored: Thu Dec 1 18:38:30 2016 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Thu Dec 1 21:42:49 2016 +0100 ---------------------------------------------------------------------- .../netty/SequenceNumberingViewReader.java | 10 ++ .../partition/SpillableSubpartition.java | 5 + .../partition/SpillableSubpartitionView.java | 22 +++- .../partition/SpilledSubpartitionView.java | 13 +- .../partition/SpillableSubpartitionTest.java | 130 +++++++++++++++++++ 5 files changed, 178 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index ef611eb..5036bb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -127,4 +127,14 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener { requestQueue.notifyReaderNonEmpty(this); } } + + @Override + public String toString() { + return "SequenceNumberingViewReader{" + + "requestLock=" + requestLock + + ", receiverId=" + receiverId + + ", numBuffersAvailable=" + numBuffersAvailable.get() + + ", sequenceNumber=" + sequenceNumber + + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java index 439e08d..ad04e97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java @@ -91,6 +91,11 @@ class SpillableSubpartition extends ResultSubpartition { return false; } + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. + updateStatistics(buffer); + if (spillWriter == null) { buffers.add(buffer); http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 8119ecc..533f95b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; @@ -30,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; class SpillableSubpartitionView implements ResultSubpartitionView { + private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class); + /** The subpartition this view belongs to. */ private final SpillableSubpartition parent; @@ -51,6 +55,9 @@ class SpillableSubpartitionView implements ResultSubpartitionView { private final AtomicBoolean isReleased = new AtomicBoolean(false); + /** Remember the number of buffers this view was created with. */ + private final long numBuffers; + /** * The next buffer to hand out. Everytime this is set to a non-null value, * a listener notification happens. @@ -73,6 +80,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { this.listener = checkNotNull(listener); synchronized (buffers) { + numBuffers = buffers.size(); nextBuffer = buffers.poll(); } @@ -94,9 +102,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView { // Create the spill writer and write all buffers to disk BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel()); + long spilledBytes = 0; + int numBuffers = buffers.size(); for (int i = 0; i < numBuffers; i++) { Buffer buffer = buffers.remove(); + spilledBytes += buffer.getSize(); try { spillWriter.writeBlock(buffer); } finally { @@ -111,6 +122,11 @@ class SpillableSubpartitionView implements ResultSubpartitionView { numBuffers, listener); + LOG.debug("Spilling {} bytes for sub partition {} of {}.", + spilledBytes, + parent.index, + parent.parent.getPartitionId()); + return numBuffers; } } @@ -188,8 +204,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView { @Override public String toString() { - return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s", + boolean hasSpilled = spilledView != null; + + return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? {}) of ResultPartition %s", parent.index, + numBuffers, + hasSpilled, parent.parent.getPartitionId()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index b087a4e..7488132 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.util.event.NotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; @@ -46,6 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener { + private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class); + /** The subpartition this view belongs to. */ private final ResultSubpartition parent; @@ -91,6 +95,9 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis if (!spillWriter.registerAllRequestsProcessedListener(this)) { isSpillInProgress = false; availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers); + } else { + LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers); } } @@ -103,6 +110,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis public void onNotification() { isSpillInProgress = false; availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers); + LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers); } @Override @@ -158,7 +166,10 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis @Override public String toString() { - return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId()); + return String.format("SpilledSubpartitionView(index: %d, buffers: {}) of ResultPartition %s", + parent.index, + numberOfSpilledBuffers, + parent.parent.getPartitionId()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java index b7a54d7..b53ef68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java @@ -18,11 +18,15 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.junit.AfterClass; import org.junit.Test; @@ -36,12 +40,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class SpillableSubpartitionTest extends SubpartitionTestBase { @@ -153,4 +161,126 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase { assertNull(readView.getNextBuffer()); } + + /** + * Tests that a spilled partition is correctly read back in via a spilled + * read view. + */ + @Test + public void testConsumeSpilledPartition() throws Exception { + ResultPartition parent = mock(ResultPartition.class); + SpillableSubpartition partition = new SpillableSubpartition( + 0, + parent, + ioManager); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + buffer.retain(); + buffer.retain(); + + partition.add(buffer); + partition.add(buffer); + partition.add(buffer); + + assertEquals(3, partition.releaseMemory()); + + partition.finish(); + + BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class); + SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener); + + verify(listener, times(1)).notifyBuffersAvailable(eq(4L)); + + Buffer read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + + read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + + read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + + // End of partition + read = reader.getNextBuffer(); + assertNotNull(read); + assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + read.recycle(); + } + + /** + * Tests that a spilled partition is correctly read back in via a spilled + * read view. + */ + @Test + public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception { + ResultPartition parent = mock(ResultPartition.class); + SpillableSubpartition partition = new SpillableSubpartition( + 0, + parent, + ioManager); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE); + buffer.retain(); + buffer.retain(); + + partition.add(buffer); + partition.add(buffer); + partition.add(buffer); + partition.finish(); + + AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener(); + SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener); + + // Initial notification + assertEquals(1, listener.getNumNotifiedBuffers()); + + Buffer read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + assertEquals(2, listener.getNumNotifiedBuffers()); + + // Spill now + assertEquals(2, partition.releaseMemory()); + + listener.awaitNotifications(4, 30_000); + assertEquals(4, listener.getNumNotifiedBuffers()); + + read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + + read = reader.getNextBuffer(); + assertNotNull(read); + read.recycle(); + + // End of partition + read = reader.getNextBuffer(); + assertNotNull(read); + assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + read.recycle(); + } + + private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { + + private long numNotifiedBuffers; + + @Override + public void notifyBuffersAvailable(long numBuffers) { + numNotifiedBuffers += numBuffers; + } + + long getNumNotifiedBuffers() { + return numNotifiedBuffers; + } + + void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) { + Thread.sleep(1); + } + } + } }