Repository: flink
Updated Branches:
  refs/heads/master dbe707324 -> 2fcef5ecf


[FLINK-5169] [network] Fix spillable subpartition buffer count


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fcef5ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fcef5ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fcef5ec

Branch: refs/heads/master
Commit: 2fcef5ecf473f82a3894b136e48b53b81b465356
Parents: c0cdc5c
Author: Ufuk Celebi <u...@apache.org>
Authored: Thu Dec 1 18:38:30 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Thu Dec 1 21:42:49 2016 +0100

----------------------------------------------------------------------
 .../netty/SequenceNumberingViewReader.java      |  10 ++
 .../partition/SpillableSubpartition.java        |   5 +
 .../partition/SpillableSubpartitionView.java    |  22 +++-
 .../partition/SpilledSubpartitionView.java      |  13 +-
 .../partition/SpillableSubpartitionTest.java    | 130 +++++++++++++++++++
 5 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index ef611eb..5036bb7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -127,4 +127,14 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener {
                        requestQueue.notifyReaderNonEmpty(this);
                }
        }
+
+       @Override
+       public String toString() {
+               return "SequenceNumberingViewReader{" +
+                       "requestLock=" + requestLock +
+                       ", receiverId=" + receiverId +
+                       ", numBuffersAvailable=" + numBuffersAvailable.get() +
+                       ", sequenceNumber=" + sequenceNumber +
+                       '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 439e08d..ad04e97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -91,6 +91,11 @@ class SpillableSubpartition extends ResultSubpartition {
                                return false;
                        }
 
+                       // The number of buffers are needed later when creating
+                       // the read views. If you ever remove this line here,
+                       // make sure to still count the number of buffers.
+                       updateStatistics(buffer);
+
                        if (spillWriter == null) {
                                buffers.add(buffer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 8119ecc..533f95b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -30,6 +32,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 class SpillableSubpartitionView implements ResultSubpartitionView {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SpillableSubpartitionView.class);
+
        /** The subpartition this view belongs to. */
        private final SpillableSubpartition parent;
 
@@ -51,6 +55,9 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
 
        private final AtomicBoolean isReleased = new AtomicBoolean(false);
 
+       /** Remember the number of buffers this view was created with. */
+       private final long numBuffers;
+
        /**
         * The next buffer to hand out. Everytime this is set to a non-null 
value,
         * a listener notification happens.
@@ -73,6 +80,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                this.listener = checkNotNull(listener);
 
                synchronized (buffers) {
+                       numBuffers = buffers.size();
                        nextBuffer = buffers.poll();
                }
 
@@ -94,9 +102,12 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                // Create the spill writer and write all 
buffers to disk
                                BufferFileWriter spillWriter = 
ioManager.createBufferFileWriter(ioManager.createChannel());
 
+                               long spilledBytes = 0;
+
                                int numBuffers = buffers.size();
                                for (int i = 0; i < numBuffers; i++) {
                                        Buffer buffer = buffers.remove();
+                                       spilledBytes += buffer.getSize();
                                        try {
                                                spillWriter.writeBlock(buffer);
                                        } finally {
@@ -111,6 +122,11 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
                                        numBuffers,
                                        listener);
 
+                               LOG.debug("Spilling {} bytes for sub partition 
{} of {}.",
+                                       spilledBytes,
+                                       parent.index,
+                                       parent.parent.getPartitionId());
+
                                return numBuffers;
                        }
                }
@@ -188,8 +204,12 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
 
        @Override
        public String toString() {
-               return String.format("SpillableSubpartitionView(index: %d) of 
ResultPartition %s",
+               boolean hasSpilled = spilledView != null;
+
+               return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? {}) of ResultPartition %s",
                        parent.index,
+                       numBuffers,
+                       hasSpilled,
                        parent.parent.getPartitionId());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index b087a4e..7488132 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -46,6 +48,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 class SpilledSubpartitionView implements ResultSubpartitionView, 
NotificationListener {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SpilledSubpartitionView.class);
+
        /** The subpartition this view belongs to. */
        private final ResultSubpartition parent;
 
@@ -91,6 +95,9 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
                if (!spillWriter.registerAllRequestsProcessedListener(this)) {
                        isSpillInProgress = false;
                        
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+                       LOG.debug("No spilling in progress. Notified about {} 
available buffers.", numberOfSpilledBuffers);
+               } else {
+                       LOG.debug("Spilling in progress. Waiting with 
notification about {} available buffers.", numberOfSpilledBuffers);
                }
        }
 
@@ -103,6 +110,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
        public void onNotification() {
                isSpillInProgress = false;
                
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+               LOG.debug("Finished spilling. Notified about {} available 
buffers.", numberOfSpilledBuffers);
        }
 
        @Override
@@ -158,7 +166,10 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
 
        @Override
        public String toString() {
-               return String.format("SpilledSubpartitionView[sync](index: %d) 
of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+               return String.format("SpilledSubpartitionView(index: %d, 
buffers: {}) of ResultPartition %s",
+                       parent.index,
+                       numberOfSpilledBuffers,
+                       parent.parent.getPartitionId());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2fcef5ec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b7a54d7..b53ef68 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -36,12 +40,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
@@ -153,4 +161,126 @@ public class SpillableSubpartitionTest extends 
SubpartitionTestBase {
 
                assertNull(readView.getNextBuffer());
        }
+
+       /**
+        * Tests that a spilled partition is correctly read back in via a 
spilled
+        * read view.
+        */
+       @Test
+       public void testConsumeSpilledPartition() throws Exception {
+               ResultPartition parent = mock(ResultPartition.class);
+               SpillableSubpartition partition = new SpillableSubpartition(
+                       0,
+                       parent,
+                       ioManager);
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+               buffer.retain();
+               buffer.retain();
+
+               partition.add(buffer);
+               partition.add(buffer);
+               partition.add(buffer);
+
+               assertEquals(3, partition.releaseMemory());
+
+               partition.finish();
+
+               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
+               SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+               verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
+
+               Buffer read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+
+               // End of partition
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+               read.recycle();
+       }
+
+       /**
+        * Tests that a spilled partition is correctly read back in via a 
spilled
+        * read view.
+        */
+       @Test
+       public void testConsumeSpillablePartitionSpilledDuringConsume() throws 
Exception {
+               ResultPartition parent = mock(ResultPartition.class);
+               SpillableSubpartition partition = new SpillableSubpartition(
+                       0,
+                       parent,
+                       ioManager);
+
+               Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), 
FreeingBufferRecycler.INSTANCE);
+               buffer.retain();
+               buffer.retain();
+
+               partition.add(buffer);
+               partition.add(buffer);
+               partition.add(buffer);
+               partition.finish();
+
+               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
+               SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+               // Initial notification
+               assertEquals(1, listener.getNumNotifiedBuffers());
+
+               Buffer read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+               assertEquals(2, listener.getNumNotifiedBuffers());
+
+               // Spill now
+               assertEquals(2, partition.releaseMemory());
+
+               listener.awaitNotifications(4, 30_000);
+               assertEquals(4, listener.getNumNotifiedBuffers());
+
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               read.recycle();
+
+               // End of partition
+               read = reader.getNextBuffer();
+               assertNotNull(read);
+               assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+               read.recycle();
+       }
+
+       private static class AwaitableBufferAvailablityListener implements 
BufferAvailabilityListener {
+
+               private long numNotifiedBuffers;
+
+               @Override
+               public void notifyBuffersAvailable(long numBuffers) {
+                       numNotifiedBuffers += numBuffers;
+               }
+
+               long getNumNotifiedBuffers() {
+                       return numNotifiedBuffers;
+               }
+
+               void awaitNotifications(long awaitedNumNotifiedBuffers, long 
timeoutMillis) throws InterruptedException {
+                       long deadline = System.currentTimeMillis() + 
timeoutMillis;
+                       while (numNotifiedBuffers < awaitedNumNotifiedBuffers 
&& System.currentTimeMillis() < deadline) {
+                               Thread.sleep(1);
+                       }
+               }
+       }
 }

Reply via email to