This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a7cf24383be9f310fb5ccc5a032721421fa45791
Author: Stephan Ewen <[email protected]>
AuthorDate: Thu Apr 25 13:09:40 2019 +0200

    [FLINK-12070] [network] Change Bounded Blocking Subpartition Implementation 
to Memory Mapped Files
    
    This commit consists of multiple steps (originally individual commits) that 
are squashed into
    one commit after review, to form a self-contained (compiling / test 
passing) commit.
    
    Part 1: Make tests that were specific to pipelined (and buffer storing) 
implementations into the
        specific test classes.
    
        Several assumptions are tied to specific implementations of the 
partitions, rather than testing
        required behavior:
    
          - The availability of statistics after disposing partitions is not 
necessary and requires extra
            effort to guarantee in certain implementations
          - The fact that the number of bytes in a partition only update on 
consumption seems wrong and
            can only apply on "consume once" implementations. This should not 
be assumed in a test base.
    
    Part 2: Make "release parent releases readers" test specific to pipelined 
partitions.
    
        For pipelined partitions, the release call on the SubPartition causes 
immedately releasing the reader (view).
        For bounded partitions, this is not required or even desirable, because 
too eager release can segfault in case
        of direct byte buffers and memory mapped files.
    
    Part 3: Remove old SpillableSubpartition and SpillableSubpartitionView
    
    Part 4: Move code specific to pipelined subpartitions into 
PipelinedSubpartition class.
    
    Part 5: Implement new BoundedBlockingSubpartition
    
    Part 6: Remove no longer applicable memory release test for blocking 
partitions
    
    Part 7: Add tests for BoundedBlockingSubpartition
---
 .../flink/core/memory/MemorySegmentFactory.java    |  14 +
 .../runtime/io/network/buffer/BufferConsumer.java  |  11 +-
 .../partition/BoundedBlockingSubpartition.java     | 274 +++++++
 .../BoundedBlockingSubpartitionReader.java         | 144 ++++
 .../io/network/partition/BufferToByteBuffer.java   | 130 ++++
 .../io/network/partition/MemoryMappedBuffers.java  | 279 +++++++
 .../runtime/io/network/partition/PageSizeUtil.java | 113 +++
 .../network/partition/PipelinedSubpartition.java   |  70 ++
 .../io/network/partition/ResultPartition.java      |  38 +-
 .../io/network/partition/ResultSubpartition.java   |  83 +--
 .../network/partition/SpillableSubpartition.java   | 312 --------
 .../partition/SpillableSubpartitionView.java       | 280 --------
 .../network/partition/SpilledSubpartitionView.java | 303 --------
 .../io/network/buffer/BufferBuilderTestUtils.java  |  29 +
 .../partition/BoundedBlockingSubpartitionTest.java | 105 +++
 .../BoundedBlockingSubpartitionWriteReadTest.java  | 199 +++++
 .../network/partition/BufferToByteBufferTest.java  |  80 +++
 .../network/partition/MemoryMappedBuffersTest.java | 162 +++++
 .../partition/PipelinedSubpartitionTest.java       |  45 ++
 .../io/network/partition/ResultPartitionTest.java  |   5 -
 .../partition/SpillableSubpartitionTest.java       | 800 ---------------------
 .../io/network/partition/SubpartitionTestBase.java | 109 ++-
 22 files changed, 1778 insertions(+), 1807 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 48b9a20..9a20e0e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -116,4 +116,18 @@ public final class MemorySegmentFactory {
                return new HybridMemorySegment(memory, owner);
        }
 
+       /**
+        * Creates a memory segment that wraps the off-heap memory backing the 
given ByteBuffer.
+        * Note that the ByteBuffer needs to be a <i>direct ByteBuffer</i>.
+        *
+        * <p>This method is intended to be used for components which pool 
memory and create
+        * memory segments around long-lived memory regions.
+        *
+        * @param memory The byte buffer with the off-heap memory to be 
represented by the memory segment.
+        * @return A new memory segment representing the given off-heap memory.
+        */
+       public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
+               return new HybridMemorySegment(memory);
+       }
+
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index affcc74..b58a627 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -61,9 +61,16 @@ public class BufferConsumer implements Closeable {
         * Constructs {@link BufferConsumer} instance with static content.
         */
        public BufferConsumer(MemorySegment memorySegment, BufferRecycler 
recycler, boolean isBuffer) {
+               this(memorySegment, recycler, memorySegment.size(), isBuffer);
+       }
+
+       /**
+        * Constructs {@link BufferConsumer} instance with static content of a 
certain size.
+        */
+       public BufferConsumer(MemorySegment memorySegment, BufferRecycler 
recycler, int size, boolean isBuffer) {
                this(new NetworkBuffer(checkNotNull(memorySegment), 
checkNotNull(recycler), isBuffer),
-                       () -> -memorySegment.size(),
-                       0);
+                               () -> -size,
+                               0);
                checkState(memorySegment.size() > 0);
                checkState(isFinished(), "BufferConsumer with static size must 
be finished after construction!");
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
new file mode 100644
index 0000000..76c7a2d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the ResultSubpartition for a bounded result transferred
+ * in a blocking manner: The result is first produced, then consumed.
+ * The result can be consumed possibly multiple times.
+ *
+ * <p>The implementation creates a temporary memory mapped file and writes all 
buffers to that
+ * memory and serves the result from that memory. The kernel backs the mapped 
memory region
+ * with physical memory and file space incrementally as new pages are filled.
+ *
+ * <h2>Important Notes on Thread Safety</h2>
+ *
+ * <p>This class does not synchronize every buffer access. It assumes the 
threading model of the
+ * Flink network stack and is not thread-safe beyond that.
+ *
+ * <p>This class assumes a single writer thread that adds buffers, flushes, 
and finishes the write
+ * phase. That same thread is also assumed to perform the partition release, 
if the release happens
+ * during the write phase.
+ *
+ * <p>The implementation supports multiple concurrent readers, but assumes a 
single
+ * thread per reader. That same thread must also release the reader. In 
particular, after the reader
+ * was released, no buffers obtained from this reader may be accessed any 
more, or segmentation
+ * faults might occur.
+ *
+ * <p>The method calls to create readers, dispose readers, and dispose the 
partition are
+ * thread-safe vis-a-vis each other.
+ */
+final class BoundedBlockingSubpartition extends ResultSubpartition {
+
+       /** This lock guards the creation of readers and disposal of the memory 
mapped file. */
+       private final Object lock = new Object();
+
+       /** The current buffer, may be filled further over time. */
+       @Nullable
+       private BufferConsumer currentBuffer;
+
+       /** The memory that we store the data in, via a memory mapped file. */
+       private final MemoryMappedBuffers memory;
+
+       /** All created and not yet released readers. */
+       @GuardedBy("lock")
+       private final Set<BoundedBlockingSubpartitionReader> readers;
+
+       /** Counter for the number of data buffers (not events!) written. */
+       private int numDataBuffersWritten;
+
+       /** The counter for the number of data buffers and events. */
+       private int numBuffersAndEventsWritten;
+
+       /** Flag indicating whether the writing has finished and this is now 
available for read. */
+       private boolean isFinished;
+
+       /** Flag indicating whether the subpartition has been released. */
+       private boolean isReleased;
+
+       /**
+        * Common constructor.
+        */
+       public BoundedBlockingSubpartition(
+                       int index,
+                       ResultPartition parent,
+                       Path filePath) throws IOException {
+
+               this(index, parent, MemoryMappedBuffers.create(filePath));
+       }
+
+       /**
+        * Constructor for testing, to pass in custom MemoryMappedBuffers.
+        */
+       @VisibleForTesting
+       BoundedBlockingSubpartition(
+                       int index,
+                       ResultPartition parent,
+                       MemoryMappedBuffers memory) throws IOException {
+
+               super(index, parent);
+
+               this.memory = checkNotNull(memory);
+               this.readers = new HashSet<>();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Checks if writing is finished.
+        * Readers cannot be created until writing is finished, and no further 
writes can happen after that.
+        */
+       public boolean isFinished() {
+               return isFinished;
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public boolean add(BufferConsumer bufferConsumer) throws IOException {
+               if (isFinished()) {
+                       bufferConsumer.close();
+                       return false;
+               }
+
+               flushCurrentBuffer();
+               currentBuffer = bufferConsumer;
+               return true;
+       }
+
+       @Override
+       public void flush() {
+               // unfortunately, the signature of flush does not allow for any 
exceptions, so we
+               // need to do this discouraged pattern of runtime exception 
wrapping
+               try {
+                       flushCurrentBuffer();
+               }
+               catch (IOException e) {
+                       throw new FlinkRuntimeException(e.getMessage(), e);
+               }
+       }
+
+       private void flushCurrentBuffer() throws IOException {
+               if (currentBuffer != null) {
+                       writeAndCloseBufferConsumer(currentBuffer);
+                       currentBuffer = null;
+               }
+       }
+
+       private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) 
throws IOException {
+               try {
+                       final Buffer buffer = bufferConsumer.build();
+                       try {
+                               memory.writeBuffer(buffer);
+
+                               numBuffersAndEventsWritten++;
+                               if (buffer.isBuffer()) {
+                                       numDataBuffersWritten++;
+                               }
+                       }
+                       finally {
+                               buffer.recycleBuffer();
+                       }
+               }
+               finally {
+                       bufferConsumer.close();
+               }
+       }
+
+       @Override
+       public void finish() throws IOException {
+               checkState(!isReleased, "data partition already released");
+               checkState(!isFinished, "data partition already finished");
+
+               isFinished = true;
+               flushCurrentBuffer();
+               
writeAndCloseBufferConsumer(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE));
+               memory.finishWrite();
+       }
+
+       @Override
+       public void release() throws IOException {
+               synchronized (lock) {
+                       if (isReleased) {
+                               return;
+                       }
+
+                       isReleased = true;
+                       isFinished = true; // for fail fast writes
+
+                       checkReaderReferencesAndDispose();
+               }
+       }
+
+       @Override
+       public ResultSubpartitionView createReadView(BufferAvailabilityListener 
availability) throws IOException {
+               synchronized (lock) {
+                       checkState(!isReleased, "data partition already 
released");
+                       checkState(isFinished, "writing of blocking partition 
not yet finished");
+
+                       availability.notifyDataAvailable();
+
+                       final MemoryMappedBuffers.BufferSlicer memoryReader = 
memory.getFullBuffers();
+                       final BoundedBlockingSubpartitionReader reader = new 
BoundedBlockingSubpartitionReader(
+                                       this, memoryReader, 
numDataBuffersWritten);
+                       readers.add(reader);
+                       return reader;
+               }
+       }
+
+       void releaseReaderReference(BoundedBlockingSubpartitionReader reader) 
throws IOException {
+               synchronized (lock) {
+                       if (readers.remove(reader) && isReleased) {
+                               checkReaderReferencesAndDispose();
+                       }
+               }
+       }
+
+       @GuardedBy("lock")
+       private void checkReaderReferencesAndDispose() throws IOException {
+               assert Thread.holdsLock(lock);
+
+               // To avoid lingering memory mapped files (large resource 
footprint), we don't
+               // wait for GC to unmap the files, but use a Netty utility to 
directly unmap the file.
+               // To avoid segmentation faults, we need to wait until all 
readers have been released.
+
+               if (readers.isEmpty()) {
+                       memory.close();
+               }
+       }
+
+       // ------------------------------ legacy 
----------------------------------
+
+       @Override
+       public int releaseMemory() throws IOException {
+               return 0;
+       }
+
+       // ---------------------------- statistics 
--------------------------------
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return 0;
+       }
+
+       @Override
+       protected long getTotalNumberOfBuffers() {
+               return numBuffersAndEventsWritten;
+       }
+
+       @Override
+       protected long getTotalNumberOfBytes() {
+               return memory.getSize();
+       }
+
+       int getBuffersInBacklog() {
+               return numDataBuffersWritten;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
new file mode 100644
index 0000000..d6c6834
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition.
+ */
+final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView {
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The next buffer (look ahead). Null once the data is depleted or 
reader is disposed. */
+       @Nullable
+       private Buffer nextBuffer;
+
+       /** The reader/decoder to the memory mapped region with the data we 
currently read from.
+        * Null once the reader empty or disposed.*/
+       @Nullable
+       private BufferSlicer memory;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int dataBufferBacklog;
+
+       /** Flag whether this reader is released. Atomic, to avoid double 
release. */
+       private boolean isReleased;
+
+       /**
+        * Convenience constructor that takes a single buffer.
+        */
+       BoundedBlockingSubpartitionReader(
+                       BoundedBlockingSubpartition parent,
+                       BufferSlicer memory,
+                       int numDataBuffers) {
+
+               checkArgument(numDataBuffers >= 0);
+
+               this.parent = checkNotNull(parent);
+               this.memory = checkNotNull(memory);
+               this.dataBufferBacklog = numDataBuffers;
+
+               this.nextBuffer = memory.sliceNextBuffer();
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               final Buffer current = nextBuffer; // copy reference to stack
+
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+               if (current.isBuffer()) {
+                       dataBufferBacklog--;
+               }
+
+               assert memory != null;
+               nextBuffer = memory.sliceNextBuffer();
+
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextBuffer, dataBufferBacklog);
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new IllegalStateException("No data should become 
available on a blocking partition during consumption.");
+       }
+
+       @Override
+       public void notifySubpartitionConsumed() throws IOException {
+               parent.onConsumedSubpartition();
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               // nulling these fields means thet read method and will fail 
fast
+               nextBuffer = null;
+               memory = null;
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public boolean nextBufferIsEvent() {
+               return nextBuffer != null && !nextBuffer.isBuffer();
+       }
+
+       @Override
+       public boolean isAvailable() {
+               return nextBuffer != null;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                               parent.parent.getPartitionId(),
+                               parent.index);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
new file mode 100644
index 0000000..4cdf41a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Putting and getting of a sequence of buffers to/from a ByteBuffer.
+ * This class handles the headers, length encoding, memory slicing.
+ */
+final class BufferToByteBuffer {
+
+       // all fields and methods below here have package-private access to 
avoid bridge
+       // methods when accessing them from the nested classes
+
+       static final int HEADER_LENGTH = 8;
+
+       static final int HEADER_VALUE_IS_BUFFER = 0;
+
+       static final int HEADER_VALUE_IS_EVENT = 1;
+
+       static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) {
+               checkArgument(buffer.position() == 0);
+               checkArgument(buffer.capacity() > 8);
+               checkArgument(buffer.limit() == buffer.capacity());
+
+               return buffer.order(ByteOrder.nativeOrder());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       static final class Writer {
+
+               private final ByteBuffer memory;
+
+               Writer(ByteBuffer memory) {
+                       this.memory = checkAndConfigureByteBuffer(memory);
+               }
+
+               public boolean writeBuffer(Buffer buffer) {
+                       final ByteBuffer memory = this.memory;
+                       final int bufferSize = buffer.getSize();
+
+                       if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+                               return false;
+                       }
+
+                       memory.putInt(buffer.isBuffer() ? 
HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
+                       memory.putInt(bufferSize);
+                       memory.put(buffer.getNioBufferReadable());
+                       return true;
+               }
+
+               public ByteBuffer complete() {
+                       memory.flip();
+                       return memory;
+               }
+
+               public int getNumBytes() {
+                       return memory.position();
+               }
+       }
+
+       static final class Reader {
+
+               private final ByteBuffer memory;
+
+               Reader(ByteBuffer memory) {
+                       this.memory = checkAndConfigureByteBuffer(memory);
+               }
+
+               @Nullable
+               public Buffer sliceNextBuffer() {
+                       final ByteBuffer memory = this.memory;
+                       final int remaining = memory.remaining();
+
+                       // we only check the correct case where data is 
exhausted
+                       // all other cases can only occur if our write logic is 
wrong and will already throw
+                       // buffer underflow exceptions which will cause the 
read to fail.
+                       if (remaining == 0) {
+                               return null;
+                       }
+
+                       final int header = memory.getInt();
+                       final int size = memory.getInt();
+
+                       memory.limit(memory.position() + size);
+                       ByteBuffer buf = memory.slice();
+                       memory.position(memory.limit());
+                       memory.limit(memory.capacity());
+
+                       MemorySegment memorySegment = 
MemorySegmentFactory.wrapOffHeapMemory(buf);
+                       Buffer buffer = new NetworkBuffer(memorySegment, 
FreeingBufferRecycler.INSTANCE);
+                       buffer.setSize(size);
+
+                       if (header == HEADER_VALUE_IS_EVENT) {
+                               buffer.tagAsEvent();
+                       }
+
+                       return buffer;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
new file mode 100644
index 0000000..6bb031e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This class is largely a workaround for the fact that a memory mapped region 
in Java can cannot
+ * be larger than 2GB (== signed 32 bit int max value). The class takes {@link 
Buffer Buffers} and
+ * writes them to several memory mapped region, using the {@link 
BufferToByteBuffer}
+ * class.
+ *
+ * <h2>Useage</h2>
+ *
+ * <p>The class assumes in the first phase that data is written by repeatedly 
calling
+ * {@link #writeBuffer(Buffer)}. That puts the data into the memory region of 
the memory
+ * mapped file. After writing, one must call {@link #finishWrite()}.
+ *
+ * <p>After that, the class can produce multiple {@link BufferSlicer} 
instances to re-read
+ * the data from the memory regions. Multiple slicers can read concurrently, 
but each slicer
+ * should be read from by a single thread.
+ *
+ * <p>Eventually, the resources must be disposed via {@link #close()}. After 
that,
+ * no reading can happen any more.
+ *
+ * <h2>Important!</h2>
+ *
+ * <p>This class performs absolutely no synchronization and relies on single 
threaded access
+ * or externally synchronized access. Concurrent access around disposal may 
cause
+ * segmentation faults!
+ *
+ * <p>This class does limited sanity checks and assumes correct use from 
{@link BoundedBlockingSubpartition}
+ * and {@link BoundedBlockingSubpartitionReader}, such as writing first and 
rading after.
+ * Not obeying these contracts throws NullPointerExceptions.
+ */
+class MemoryMappedBuffers implements Closeable {
+
+       /** Memory mappings should be at the granularity of page sizes, for 
efficiency. */
+       private static final int PAGE_SIZE = 
PageSizeUtil.getSystemPageSizeOrConservativeMultiple();
+
+       /** The encoder to the current memory mapped region we are writing to.
+        * This value is null once writing has finished or the buffers are 
disposed. */
+       @Nullable
+       private BufferToByteBuffer.Writer currentBuffer;
+
+       /** All memory mapped regions that are already full (completed). */
+       private final ArrayList<ByteBuffer> fullBuffers;
+
+       /** The file channel backing the memory mapped file. */
+       private final FileChannel file;
+
+       /** The path of the memory mapped file. */
+       private final Path filePath;
+
+       /** The offset where the next mapped region should start. */
+       private long nextMappingOffset;
+
+       /** The size of each mapped region. */
+       private final long mappingSize;
+
+       MemoryMappedBuffers(
+                       Path filePath,
+                       FileChannel fileChannel,
+                       int maxSizePerByteBuffer) throws IOException {
+
+               this.filePath = filePath;
+               this.file = fileChannel;
+               this.mappingSize = alignSize(maxSizePerByteBuffer);
+               this.fullBuffers = new ArrayList<>(4);
+
+               rollOverToNextBuffer();
+       }
+
+       void writeBuffer(Buffer buffer) throws IOException {
+               assert currentBuffer != null;
+
+               if (currentBuffer.writeBuffer(buffer)) {
+                       return;
+               }
+
+               rollOverToNextBuffer();
+
+               if (!currentBuffer.writeBuffer(buffer)) {
+                       throwTooLargeBuffer(buffer);
+               }
+       }
+
+       BufferSlicer getFullBuffers() {
+               assert currentBuffer == null;
+
+               final List<ByteBuffer> buffers = fullBuffers.stream()
+                               .map(ByteBuffer::slice)
+                               .collect(Collectors.toList());
+
+               return new BufferSlicer(buffers);
+       }
+
+       /**
+        * Finishes the current region and prevents further writes.
+        * After calling this method, further calls to {@link 
#writeBuffer(Buffer)} will fail.
+        */
+       void finishWrite() throws IOException {
+               assert currentBuffer != null;
+
+               fullBuffers.add(currentBuffer.complete());
+               currentBuffer = null; // fail further writes fast
+               file.close(); // won't map further regions from now on
+       }
+
+       /**
+        * Unmaps the file from memory and deletes the file.
+        * After calling this method, access to any ByteBuffer obtained from 
this instance
+        * will cause a segmentation fault.
+        */
+       public void close() throws IOException {
+               IOUtils.closeQuietly(file); // in case we dispose before 
finishing writes
+
+               for (ByteBuffer bb : fullBuffers) {
+                       PlatformDependent.freeDirectBuffer(bb);
+               }
+               fullBuffers.clear();
+
+               if (currentBuffer != null) {
+                       
PlatformDependent.freeDirectBuffer(currentBuffer.complete());
+                       currentBuffer = null;
+               }
+
+               // To make this compatible with all versions of Windows, we 
must wait with
+               // deleting the file until it is unmapped.
+               // See also 
https://stackoverflow.com/questions/11099295/file-flag-delete-on-close-and-memory-mapped-files/51649618#51649618
+
+               Files.delete(filePath);
+       }
+
+       /**
+        * Gets the number of bytes of all written data (including the metadata 
in the buffer headers).
+        */
+       long getSize() {
+               long size = 0L;
+               for (ByteBuffer bb : fullBuffers) {
+                       size += bb.remaining();
+               }
+               if (currentBuffer != null) {
+                       size += currentBuffer.getNumBytes();
+               }
+               return size;
+       }
+
+       private void rollOverToNextBuffer() throws IOException {
+               if (currentBuffer != null) {
+                       // we need to remember the original buffers, not any 
slices.
+                       // slices have no cleaner, which we need to trigger 
explicit unmapping
+                       fullBuffers.add(currentBuffer.complete());
+               }
+
+               final ByteBuffer mapped = file.map(MapMode.READ_WRITE, 
nextMappingOffset, mappingSize);
+               currentBuffer = new Writer(mapped);
+               nextMappingOffset += mappingSize;
+       }
+
+       private void throwTooLargeBuffer(Buffer buffer) throws IOException {
+               throw new IOException(String.format(
+                               "The buffer (%d bytes) is larger than the 
maximum size of a memory buffer (%d bytes)",
+                               buffer.getSize(), mappingSize));
+       }
+
+       /**
+        * Rounds the size down to the next multiple of the {@link #PAGE_SIZE}.
+        * We need to round down here to not exceed the original maximum size 
value.
+        * Otherwise, values like INT_MAX would round up to overflow the valid 
maximum
+        * size of a memory mapping region in Java.
+        */
+       private static int alignSize(int maxRegionSize) {
+               checkArgument(maxRegionSize >= PAGE_SIZE);
+               return maxRegionSize - (maxRegionSize % PAGE_SIZE);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Reader
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The "reader" for the memory region. It slices a sequence of buffers 
from the
+        * sequence of mapped ByteBuffers.
+        */
+       static final class BufferSlicer {
+
+               /** The reader/decoder to the memory mapped region with the 
data we currently read from.
+                * Max 2GB large. Further regions may be in the {@link 
#furtherData} field. */
+               private BufferToByteBuffer.Reader data;
+
+               /** Further byte buffers, to handle cases where there is more 
data than fits into
+                * one mapped byte buffer (2GB = Integer.MAX_VALUE). */
+               private final Iterator<ByteBuffer> furtherData;
+
+               BufferSlicer(Iterable<ByteBuffer> data) {
+                       this.furtherData = data.iterator();
+                       this.data = new 
BufferToByteBuffer.Reader(furtherData.next());
+               }
+
+               @Nullable
+               public Buffer sliceNextBuffer() {
+                       // should only be null once empty or disposed, in which 
case this method
+                       // should not be called any more
+                       assert data != null;
+
+                       final Buffer next = data.sliceNextBuffer();
+                       if (next != null) {
+                               return next;
+                       }
+
+                       if (!furtherData.hasNext()) {
+                               return null;
+                       }
+
+                       data = new 
BufferToByteBuffer.Reader(furtherData.next());
+                       return sliceNextBuffer();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Factories
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new MemoryMappedBuffers, creating a memory mapped file at 
the given path.
+        */
+       public static MemoryMappedBuffers create(Path memMappedFilePath) throws 
IOException {
+               return createWithRegionSize(memMappedFilePath, 
Integer.MAX_VALUE);
+       }
+
+       /**
+        * Creates new MemoryMappedBuffers, creating a memory mapped file at 
the given path.
+        * Each mapped region (= ByteBuffer) will be of the given size.
+        */
+       public static MemoryMappedBuffers createWithRegionSize(Path 
memMappedFilePath, int regionSize) throws IOException {
+               final FileChannel fileChannel = 
FileChannel.open(memMappedFilePath,
+                               StandardOpenOption.READ, 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+
+               return new MemoryMappedBuffers(memMappedFilePath, fileChannel, 
regionSize);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
new file mode 100644
index 0000000..1ce1a76
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+public final class PageSizeUtil {
+
+       /** Value indicating an unknown page size. */
+       public static final int PAGE_SIZE_UNKNOWN = -1;
+
+       /** The default page size on most systems. */
+       public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+       /** A conservative fallback value (64 KiBytes) that should be a 
multiple of the page size even
+        * in some uncommon cases of servers installations with 
larger-than-usual page sizes. */
+       public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+       /**
+        * Tries to get the system page size. If the page size cannot be 
determined, this
+        * returns -1.
+        *
+        * <p>This internally relies on the presence of "unsafe" and the 
resolution via some
+        * Netty utilities.
+        */
+       public static int getSystemPageSize() {
+               try {
+                       return PageSizeUtilInternal.getSystemPageSize();
+               }
+               catch (Throwable t) {
+                       ExceptionUtils.rethrowIfFatalError(t);
+                       return PAGE_SIZE_UNKNOWN;
+               }
+       }
+
+       /**
+        * Tries to get the system page size. If the page size cannot be 
determined, this
+        * returns the {@link #DEFAULT_PAGE_SIZE}.
+        */
+       public static int getSystemPageSizeOrDefault() {
+               final int pageSize = getSystemPageSize();
+               return pageSize == PAGE_SIZE_UNKNOWN ? DEFAULT_PAGE_SIZE : 
pageSize;
+       }
+
+       /**
+        * Tries to get the system page size. If the page size cannot be 
determined, this
+        * returns the {@link #CONSERVATIVE_PAGE_SIZE_MULTIPLE}.
+        */
+       public static int getSystemPageSizeOrConservativeMultiple() {
+               final int pageSize = getSystemPageSize();
+               return pageSize == PAGE_SIZE_UNKNOWN ? 
CONSERVATIVE_PAGE_SIZE_MULTIPLE : pageSize;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** This class is not meant to be instantiated. */
+       private PageSizeUtil() {}
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * All unsafe related code must be in a separate class, so that loading 
the outer class
+        * does not implicitly try to resolve the unsafe class.
+        */
+       @SuppressWarnings("all")
+       private static final class PageSizeUtilInternal {
+
+               static int getSystemPageSize() {
+                       Unsafe unsafe = unsafe();
+                       return unsafe == null ? PAGE_SIZE_UNKNOWN : 
unsafe.pageSize();
+               }
+
+               @Nullable
+               private static Unsafe unsafe() {
+                       if (PlatformDependent.hasUnsafe()) {
+                               return (Unsafe) AccessController.doPrivileged(
+                                               (PrivilegedAction<Object>) () 
-> UnsafeAccess.UNSAFE);
+                       }
+                       else {
+                               return null;
+                       }
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index fe27d97..7394e6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -30,6 +31,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -56,6 +58,13 @@ class PipelinedSubpartition extends ResultSubpartition {
 
        // 
------------------------------------------------------------------------
 
+       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
+       private final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
+
+       /** The number of non-event buffers currently in this subpartition. */
+       @GuardedBy("buffers")
+       private int buffersInBacklog;
+
        /** The read view to consume this subpartition. */
        private PipelinedSubpartitionView readView;
 
@@ -68,6 +77,12 @@ class PipelinedSubpartition extends ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
 
+       /** The total number of buffers (both data and event buffers). */
+       private long totalNumberOfBuffers;
+
+       /** The total number of bytes (both data and event buffers). */
+       private long totalNumberOfBytes;
+
        // 
------------------------------------------------------------------------
 
        PipelinedSubpartition(int index, ResultPartition parent) {
@@ -300,6 +315,61 @@ class PipelinedSubpartition extends ResultSubpartition {
                }
        }
 
+       @Override
+       protected long getTotalNumberOfBuffers() {
+               return totalNumberOfBuffers;
+       }
+
+       @Override
+       protected long getTotalNumberOfBytes() {
+               return totalNumberOfBytes;
+       }
+
+       Throwable getFailureCause() {
+               return parent.getFailureCause();
+       }
+
+       private void updateStatistics(BufferConsumer buffer) {
+               totalNumberOfBuffers++;
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               totalNumberOfBytes += buffer.getSize();
+       }
+
+       @GuardedBy("buffers")
+       private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
+               assert Thread.holdsLock(buffers);
+               if (isBuffer) {
+                       buffersInBacklog--;
+               }
+       }
+
+       /**
+        * Increases the number of non-event buffers by one after adding a 
non-event
+        * buffer into this subpartition.
+        */
+       @GuardedBy("buffers")
+       private void increaseBuffersInBacklog(BufferConsumer buffer) {
+               assert Thread.holdsLock(buffers);
+
+               if (buffer != null && buffer.isBuffer()) {
+                       buffersInBacklog++;
+               }
+       }
+
+       /**
+        * Gets the number of non-event buffers in this subpartition.
+        *
+        * <p><strong>Beware:</strong> This method should only be used in tests 
in non-concurrent access
+        * scenarios since it does not make any concurrency guarantees.
+        */
+       @SuppressWarnings("FieldAccessNotGuarded")
+       @VisibleForTesting
+       public int getBuffersInBacklog() {
+               return buffersInBacklog;
+       }
+
        private boolean shouldNotifyDataAvailable() {
                // Notify only when we added first finished buffer.
                return readView != null && !flushRequested && 
getNumberOfFinishedBuffers() == 1;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 24ce27e..1ff1ec5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -150,10 +152,7 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                // Create the subpartitions.
                switch (partitionType) {
                        case BLOCKING:
-                               for (int i = 0; i < subpartitions.length; i++) {
-                                       subpartitions[i] = new 
SpillableSubpartition(i, this, ioManager);
-                               }
-
+                               
initializeBoundedBlockingPartitions(subpartitions, this, ioManager);
                                break;
 
                        case PIPELINED:
@@ -466,4 +465,35 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
                        hasNotifiedPipelinedConsumers = true;
                }
        }
+
+       private static void initializeBoundedBlockingPartitions(
+                       ResultSubpartition[] subpartitions,
+                       ResultPartition parent,
+                       IOManager ioManager) {
+
+               int i = 0;
+               try {
+                       for (; i < subpartitions.length; i++) {
+                               subpartitions[i] = new 
BoundedBlockingSubpartition(
+                                               i, parent, 
ioManager.createChannel().getPathFile().toPath());
+                       }
+               }
+               catch (IOException e) {
+                       // undo all the work so that a failed constructor does 
not leave any resources
+                       // in need of disposal
+                       releasePartitionsQuietly(subpartitions, i);
+
+                       // this is not good, we should not be forced to wrap 
this in a runtime exception.
+                       // the fact that the ResultPartition and Task 
constructor (which calls this) do not tolerate any exceptions
+                       // is incompatible with eager initialization of 
resources (RAII).
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private static void releasePartitionsQuietly(ResultSubpartition[] 
partitions, int until) {
+               for (int i = 0; i < until; i++) {
+                       final ResultSubpartition subpartition = partitions[i];
+                       
ExceptionUtils.suppressExceptions(subpartition::release);
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 58a1402..920ce8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -22,10 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.io.IOException;
-import java.util.ArrayDeque;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,41 +37,19 @@ public abstract class ResultSubpartition {
        /** The parent partition this subpartition belongs to. */
        protected final ResultPartition parent;
 
-       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
-       protected final ArrayDeque<BufferConsumer> buffers = new ArrayDeque<>();
-
-       /** The number of non-event buffers currently in this subpartition. */
-       @GuardedBy("buffers")
-       private int buffersInBacklog;
-
        // - Statistics 
----------------------------------------------------------
 
-       /** The total number of buffers (both data and event buffers). */
-       private long totalNumberOfBuffers;
-
-       /** The total number of bytes (both data and event buffers). */
-       private long totalNumberOfBytes;
-
        public ResultSubpartition(int index, ResultPartition parent) {
                this.index = index;
                this.parent = parent;
        }
 
-       protected void updateStatistics(BufferConsumer buffer) {
-               totalNumberOfBuffers++;
-       }
-
-       protected void updateStatistics(Buffer buffer) {
-               totalNumberOfBytes += buffer.getSize();
-       }
-
-       protected long getTotalNumberOfBuffers() {
-               return totalNumberOfBuffers;
-       }
+       /**
+        * Gets the total numbers of buffers (data buffers plus events).
+        */
+       protected abstract long getTotalNumberOfBuffers();
 
-       protected long getTotalNumberOfBytes() {
-               return totalNumberOfBytes;
-       }
+       protected abstract long getTotalNumberOfBytes();
 
        /**
         * Notifies the parent partition about a consumed {@link 
ResultSubpartitionView}.
@@ -83,10 +58,6 @@ public abstract class ResultSubpartition {
                parent.onConsumedSubpartition(index);
        }
 
-       protected Throwable getFailureCause() {
-               return parent.getFailureCause();
-       }
-
        /**
         * Adds the given buffer.
         *
@@ -123,9 +94,7 @@ public abstract class ResultSubpartition {
         * scenarios since it does not make any concurrency guarantees.
         */
        @VisibleForTesting
-       public int getBuffersInBacklog() {
-               return buffersInBacklog;
-       }
+       abstract int getBuffersInBacklog();
 
        /**
         * Makes a best effort to get the current size of the queue.
@@ -134,38 +103,6 @@ public abstract class ResultSubpartition {
         */
        public abstract int unsynchronizedGetNumberOfQueuedBuffers();
 
-       /**
-        * Decreases the number of non-event buffers by one after fetching a 
non-event
-        * buffer from this subpartition (for access by the subpartition views).
-        *
-        * @return backlog after the operation
-        */
-       public int decreaseBuffersInBacklog(Buffer buffer) {
-               synchronized (buffers) {
-                       return decreaseBuffersInBacklogUnsafe(buffer != null && 
buffer.isBuffer());
-               }
-       }
-
-       protected int decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
-               assert Thread.holdsLock(buffers);
-               if (isBuffer) {
-                       buffersInBacklog--;
-               }
-               return buffersInBacklog;
-       }
-
-       /**
-        * Increases the number of non-event buffers by one after adding a 
non-event
-        * buffer into this subpartition.
-        */
-       protected void increaseBuffersInBacklog(BufferConsumer buffer) {
-               assert Thread.holdsLock(buffers);
-
-               if (buffer != null && buffer.isBuffer()) {
-                       buffersInBacklog++;
-               }
-       }
-
        // 
------------------------------------------------------------------------
 
        /**
@@ -201,6 +138,14 @@ public abstract class ResultSubpartition {
                public boolean nextBufferIsEvent() {
                        return nextBufferIsEvent;
                }
+
+               public static BufferAndBacklog fromBufferAndLookahead(Buffer 
current, Buffer lookahead, int backlog) {
+                       return new BufferAndBacklog(
+                                       current,
+                                       lookahead != null,
+                                       backlog,
+                                       lookahead != null && 
!lookahead.isBuffer());
+               }
        }
 
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
deleted file mode 100644
index 9f696ad..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A spillable sub partition starts out in-memory and spills to disk if asked
- * to do so.
- *
- * <p>Buffers for the partition come from a {@link BufferPool}. The buffer pool
- * is also responsible to trigger the release of the buffers if it needs them
- * back. At this point, the spillable sub partition will write all in-memory
- * buffers to disk. All added buffers after that point directly go to disk.
- *
- * <p>This partition type is used for {@link ResultPartitionType#BLOCKING}
- * results that are fully produced before they can be consumed. At the point
- * when they are consumed, the buffers are (i) all in-memory, (ii) currently
- * being spilled to disk, or (iii) completely spilled to disk. Depending on
- * this state, different reader variants are returned (see
- * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
- *
- * <p>Since the network buffer pool size for outgoing partitions is usually
- * quite small, e.g. via the {@link 
TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
- * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
- * for bounded channels or from the default values of
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, and
- * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, most spillable 
partitions
- * will be spilled for real-world data sets.
- *
- * <p>Note on thread safety. Synchronizing on {@code buffers} is used to 
synchronize
- * writes and reads. Synchronizing on {@code this} is used against concurrent
- * {@link #add(BufferConsumer)} and clean ups {@link #release()} / {@link 
#finish()} which
- * also are touching {@code spillWriter}. Since we do not want to block reads 
during
- * spilling, we need those two synchronization. Probably this model could be 
simplified.
- */
-class SpillableSubpartition extends ResultSubpartition {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SpillableSubpartition.class);
-
-       /** The I/O manager used for spilling buffers to disk. */
-       private final IOManager ioManager;
-
-       /** The writer used for spilling. As long as this is null, we are 
in-memory. */
-       private BufferFileWriter spillWriter;
-
-       /** Flag indicating whether the subpartition has been finished. */
-       private boolean isFinished;
-
-       /** Flag indicating whether the subpartition has been released. */
-       private volatile boolean isReleased;
-
-       /** The read view to consume this subpartition. */
-       private ResultSubpartitionView readView;
-
-       SpillableSubpartition(int index, ResultPartition parent, IOManager 
ioManager) {
-               super(index, parent);
-
-               this.ioManager = checkNotNull(ioManager);
-       }
-
-       @Override
-       public synchronized boolean add(BufferConsumer bufferConsumer) throws 
IOException {
-               return add(bufferConsumer, false);
-       }
-
-       private boolean add(BufferConsumer bufferConsumer, boolean 
forceFinishRemainingBuffers)
-                       throws IOException {
-               checkNotNull(bufferConsumer);
-
-               synchronized (buffers) {
-                       if (isFinished || isReleased) {
-                               bufferConsumer.close();
-                               return false;
-                       }
-
-                       buffers.add(bufferConsumer);
-                       // The number of buffers are needed later when creating
-                       // the read views. If you ever remove this line here,
-                       // make sure to still count the number of buffers.
-                       updateStatistics(bufferConsumer);
-                       increaseBuffersInBacklog(bufferConsumer);
-
-                       if (spillWriter != null) {
-                               
spillFinishedBufferConsumers(forceFinishRemainingBuffers);
-                       }
-               }
-               return true;
-       }
-
-       @Override
-       public void flush() {
-               synchronized (buffers) {
-                       if (readView != null) {
-                               readView.notifyDataAvailable();
-                       }
-               }
-       }
-
-       @Override
-       public synchronized void finish() throws IOException {
-               synchronized (buffers) {
-                       if 
(add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
-                               isFinished = true;
-                       }
-
-                       flush();
-               }
-
-               // If we are spilling/have spilled, wait for the writer to 
finish
-               if (spillWriter != null) {
-                       spillWriter.close();
-               }
-               LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
-       }
-
-       @Override
-       public synchronized void release() throws IOException {
-               // view reference accessible outside the lock, but assigned 
inside the locked scope
-               final ResultSubpartitionView view;
-
-               synchronized (buffers) {
-                       if (isReleased) {
-                               return;
-                       }
-
-                       // Release all available buffers
-                       for (BufferConsumer buffer : buffers) {
-                               buffer.close();
-                       }
-                       buffers.clear();
-
-                       view = readView;
-
-                       // No consumer yet, we are responsible to clean 
everything up. If
-                       // one is available, the view is responsible is to 
clean up (see
-                       // below).
-                       if (view == null) {
-
-                               // TODO This can block until all buffers are 
written out to
-                               // disk if a spill is in-progress before 
deleting the file.
-                               // It is possibly called from the Netty event 
loop threads,
-                               // which can bring down the network.
-                               if (spillWriter != null) {
-                                       spillWriter.closeAndDelete();
-                               }
-                       }
-
-                       isReleased = true;
-               }
-
-               LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
-
-               if (view != null) {
-                       view.releaseAllResources();
-               }
-       }
-
-       @Override
-       public ResultSubpartitionView createReadView(BufferAvailabilityListener 
availabilityListener) throws IOException {
-               synchronized (buffers) {
-                       if (!isFinished) {
-                               throw new IllegalStateException("Subpartition 
has not been finished yet, " +
-                                       "but blocking subpartitions can only be 
consumed after they have " +
-                                       "been finished.");
-                       }
-
-                       if (readView != null) {
-                               throw new IllegalStateException("Subpartition 
is being or already has been " +
-                                       "consumed, but we currently allow 
subpartitions to only be consumed once.");
-                       }
-
-                       if (spillWriter != null) {
-                               readView = new SpilledSubpartitionView(
-                                       this,
-                                       
parent.getBufferProvider().getMemorySegmentSize(),
-                                       spillWriter,
-                                       getTotalNumberOfBuffers(),
-                                       availabilityListener);
-                       } else {
-                               readView = new SpillableSubpartitionView(
-                                       this,
-                                       buffers,
-                                       ioManager,
-                                       
parent.getBufferProvider().getMemorySegmentSize(),
-                                       availabilityListener);
-                       }
-                       return readView;
-               }
-       }
-
-       @Override
-       public int releaseMemory() throws IOException {
-               synchronized (buffers) {
-                       ResultSubpartitionView view = readView;
-
-                       if (view != null && view.getClass() == 
SpillableSubpartitionView.class) {
-                               // If there is a spillable view, it's the 
responsibility of the
-                               // view to release memory.
-                               SpillableSubpartitionView spillableView = 
(SpillableSubpartitionView) view;
-                               return spillableView.releaseMemory();
-                       } else if (spillWriter == null) {
-                               // No view and in-memory => spill to disk
-                               spillWriter = 
ioManager.createBufferFileWriter(ioManager.createChannel());
-
-                               int numberOfBuffers = buffers.size();
-                               long spilledBytes = 
spillFinishedBufferConsumers(isFinished);
-                               int spilledBuffers = numberOfBuffers - 
buffers.size();
-
-                               LOG.debug("{}: Spilling {} bytes ({} buffers} 
for sub partition {} of {}.",
-                                       parent.getOwningTaskName(), 
spilledBytes, spilledBuffers, index, parent.getPartitionId());
-
-                               return spilledBuffers;
-                       }
-               }
-
-               // Else: We have already spilled and don't hold any buffers
-               return 0;
-       }
-
-       @VisibleForTesting
-       long spillFinishedBufferConsumers(boolean forceFinishRemainingBuffers) 
throws IOException {
-               long spilledBytes = 0;
-
-               while (!buffers.isEmpty()) {
-                       BufferConsumer bufferConsumer = buffers.getFirst();
-                       Buffer buffer = bufferConsumer.build();
-                       updateStatistics(buffer);
-                       int bufferSize = buffer.getSize();
-                       spilledBytes += bufferSize;
-
-                       // NOTE we may be in the process of finishing the 
subpartition where any buffer should
-                       // be treated as if it was finished!
-                       if (bufferConsumer.isFinished() || 
forceFinishRemainingBuffers) {
-                               if (bufferSize > 0) {
-                                       spillWriter.writeBlock(buffer);
-                               } else {
-                                       // If we skip a buffer for the spill 
writer, we need to adapt the backlog accordingly
-                                       decreaseBuffersInBacklog(buffer);
-                                       buffer.recycleBuffer();
-                               }
-                               bufferConsumer.close();
-                               buffers.poll();
-                       } else {
-                               // If there is already data, we need to spill 
it anyway, since we do not get this
-                               // slice from the buffer consumer again during 
the next build.
-                               // BEWARE: by doing so, we increase the actual 
number of buffers in the spill writer!
-                               if (bufferSize > 0) {
-                                       spillWriter.writeBlock(buffer);
-                                       
increaseBuffersInBacklog(bufferConsumer);
-                               } else {
-                                       buffer.recycleBuffer();
-                               }
-
-                               return spilledBytes;
-                       }
-               }
-               return spilledBytes;
-       }
-
-       @Override
-       public boolean isReleased() {
-               return isReleased;
-       }
-
-       @Override
-       public int unsynchronizedGetNumberOfQueuedBuffers() {
-               // since we do not synchronize, the size may actually be lower 
than 0!
-               return Math.max(buffers.size(), 0);
-       }
-
-       @Override
-       public String toString() {
-               return String.format("SpillableSubpartition#%d [%d number of 
buffers (%d bytes)," +
-                               "%d number of buffers in backlog, finished? %s, 
read view? %s, spilled? %s]",
-                       index, getTotalNumberOfBuffers(), 
getTotalNumberOfBytes(),
-                       getBuffersInBacklog(), isFinished, readView != null, 
spillWriter != null);
-       }
-
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
deleted file mode 100644
index 65790d7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-class SpillableSubpartitionView implements ResultSubpartitionView {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SpillableSubpartitionView.class);
-
-       /** The subpartition this view belongs to. */
-       private final SpillableSubpartition parent;
-
-       /** All buffers of this subpartition. Access to the buffers is 
synchronized on this object. */
-       private final ArrayDeque<BufferConsumer> buffers;
-
-       /** IO manager if we need to spill (for spilled case). */
-       private final IOManager ioManager;
-
-       /** Size of memory segments (for spilled case). */
-       private final int memorySegmentSize;
-
-       /**
-        * The buffer availability listener. As long as in-memory, notifications
-        * happen on a buffer per buffer basis as spilling may happen after a
-        * notification has been sent out.
-        */
-       private final BufferAvailabilityListener listener;
-
-       private final AtomicBoolean isReleased = new AtomicBoolean(false);
-
-       /** Remember the number of buffers this view was created with. */
-       private final long numBuffers;
-
-       /**
-        * The next buffer to hand out. Everytime this is set to a non-null 
value,
-        * a listener notification happens.
-        */
-       private BufferConsumer nextBuffer;
-
-       private volatile SpilledSubpartitionView spilledView;
-
-       SpillableSubpartitionView(
-               SpillableSubpartition parent,
-               ArrayDeque<BufferConsumer> buffers,
-               IOManager ioManager,
-               int memorySegmentSize,
-               BufferAvailabilityListener listener) {
-
-               this.parent = checkNotNull(parent);
-               this.buffers = checkNotNull(buffers);
-               this.ioManager = checkNotNull(ioManager);
-               this.memorySegmentSize = memorySegmentSize;
-               this.listener = checkNotNull(listener);
-
-               synchronized (buffers) {
-                       numBuffers = buffers.size();
-                       nextBuffer = buffers.poll();
-               }
-
-               if (nextBuffer != null) {
-                       listener.notifyDataAvailable();
-               }
-       }
-
-       int releaseMemory() throws IOException {
-               synchronized (buffers) {
-                       if (spilledView != null || nextBuffer == null) {
-                               // Already spilled or nothing in-memory
-                               return 0;
-                       } else {
-                               // We don't touch next buffer, because a 
notification has
-                               // already been sent for it. Only when it is 
consumed, will
-                               // it be recycled.
-
-                               // Create the spill writer and write all 
buffers to disk
-                               BufferFileWriter spillWriter = 
ioManager.createBufferFileWriter(ioManager.createChannel());
-
-                               long spilledBytes = 0;
-
-                               int numBuffers = buffers.size();
-                               for (int i = 0; i < numBuffers; i++) {
-                                       try (BufferConsumer bufferConsumer = 
buffers.remove()) {
-                                               Buffer buffer = 
bufferConsumer.build();
-                                               
checkState(bufferConsumer.isFinished(), "BufferConsumer must be finished before 
" +
-                                                       "spilling. Otherwise we 
would not be able to simply remove it from the queue. This should " +
-                                                       "be guaranteed by 
creating ResultSubpartitionView only once Subpartition isFinished.");
-                                               parent.updateStatistics(buffer);
-                                               spilledBytes += 
buffer.getSize();
-                                               spillWriter.writeBlock(buffer);
-                                       }
-                               }
-
-                               spilledView = new SpilledSubpartitionView(
-                                       parent,
-                                       memorySegmentSize,
-                                       spillWriter,
-                                       numBuffers,
-                                       listener);
-
-                               LOG.debug("Spilling {} bytes for sub partition 
{} of {}.",
-                                       spilledBytes,
-                                       parent.index,
-                                       parent.parent.getPartitionId());
-
-                               return numBuffers;
-                       }
-               }
-       }
-
-       @Nullable
-       @Override
-       public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
-               Buffer current = null;
-               boolean nextBufferIsEvent = false;
-               int newBacklog = 0; // this is always correct if current is 
non-null!
-               boolean isMoreAvailable = false;
-
-               synchronized (buffers) {
-                       if (isReleased.get()) {
-                               return null;
-                       } else if (nextBuffer != null) {
-                               current = nextBuffer.build();
-                               checkState(nextBuffer.isFinished(),
-                                       "We can only read from 
SpillableSubpartition after it was finished");
-
-                               newBacklog = 
parent.decreaseBuffersInBacklogUnsafe(nextBuffer.isBuffer());
-                               nextBuffer.close();
-                               nextBuffer = buffers.poll();
-
-                               if (nextBuffer != null) {
-                                       nextBufferIsEvent = 
!nextBuffer.isBuffer();
-                                       isMoreAvailable = true;
-                               }
-
-                               parent.updateStatistics(current);
-                               // if we are spilled (but still process a 
non-spilled nextBuffer), we don't know the
-                               // state of nextBufferIsEvent or whether more 
buffers are available
-                               if (spilledView == null) {
-                                       return new BufferAndBacklog(current, 
isMoreAvailable, newBacklog, nextBufferIsEvent);
-                               }
-                       }
-               } // else: spilled
-
-               SpilledSubpartitionView spilled = spilledView;
-               if (spilled != null) {
-                       if (current != null) {
-                               return new BufferAndBacklog(current, 
spilled.isAvailable(), newBacklog, spilled.nextBufferIsEvent());
-                       } else {
-                               return spilled.getNextBuffer();
-                       }
-               } else {
-                       throw new IllegalStateException("No in-memory buffers 
available, but also nothing spilled.");
-               }
-       }
-
-       @Override
-       public void notifyDataAvailable() {
-               // We do the availability listener notification one by one
-       }
-
-       @Override
-       public void releaseAllResources() throws IOException {
-               if (isReleased.compareAndSet(false, true)) {
-                       SpilledSubpartitionView spilled = spilledView;
-                       if (spilled != null) {
-                               spilled.releaseAllResources();
-                       }
-                       // we are never giving this buffer out in 
getNextBuffer(), so we need to clean it up
-                       synchronized (buffers) {
-                               if (nextBuffer != null) {
-                                       nextBuffer.close();
-                                       nextBuffer = null;
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void notifySubpartitionConsumed() throws IOException {
-               SpilledSubpartitionView spilled = spilledView;
-               if (spilled != null) {
-                       spilled.notifySubpartitionConsumed();
-               } else {
-                       parent.onConsumedSubpartition();
-               }
-       }
-
-       @Override
-       public boolean isReleased() {
-               SpilledSubpartitionView spilled = spilledView;
-               if (spilled != null) {
-                       return spilled.isReleased();
-               } else {
-                       return parent.isReleased() || isReleased.get();
-               }
-       }
-
-       @Override
-       public boolean nextBufferIsEvent() {
-               synchronized (buffers) {
-                       if (isReleased.get()) {
-                               return false;
-                       } else if (nextBuffer != null) {
-                               return !nextBuffer.isBuffer();
-                       }
-               } // else: spilled
-
-               checkState(spilledView != null, "No in-memory buffers 
available, but also nothing spilled.");
-
-               return spilledView.nextBufferIsEvent();
-       }
-
-       @Override
-       public boolean isAvailable() {
-               synchronized (buffers) {
-                       if (nextBuffer != null) {
-                               return true;
-                       }
-                       else if (spilledView == null) {
-                               return false;
-                       }
-               } // else: spilled
-
-               return spilledView.isAvailable();
-       }
-
-       @Override
-       public Throwable getFailureCause() {
-               SpilledSubpartitionView spilled = spilledView;
-               if (spilled != null) {
-                       return spilled.getFailureCause();
-               } else {
-                       return parent.getFailureCause();
-               }
-       }
-
-       @Override
-       public String toString() {
-               boolean hasSpilled = spilledView != null;
-
-               return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? %b) of ResultPartition %s",
-                       parent.index,
-                       numBuffers,
-                       hasSpilled,
-                       parent.parent.getPartitionId());
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
deleted file mode 100644
index f941e20..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
-import org.apache.flink.runtime.util.event.NotificationListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Reader for a spilled sub partition.
- *
- * <p>The partition availability listener is notified about available buffers
- * only when the spilling is done. Spilling is done async and if it is still
- * in progress, we wait with the notification until the spilling is done.
- *
- * <p>Reads of the spilled file are done in synchronously.
- */
-class SpilledSubpartitionView implements ResultSubpartitionView, 
NotificationListener {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SpilledSubpartitionView.class);
-
-       /** The subpartition this view belongs to. */
-       private final SpillableSubpartition parent;
-
-       /** Writer for spills. */
-       private final BufferFileWriter spillWriter;
-
-       /** The synchronous file reader to do the actual I/O. */
-       @GuardedBy("this")
-       private final BufferFileReader fileReader;
-
-       /** The buffer pool to read data into. */
-       private final SpillReadBufferPool bufferPool;
-
-       /** Buffer availability listener. */
-       private final BufferAvailabilityListener availabilityListener;
-
-       /** The total number of spilled buffers. */
-       private final long numberOfSpilledBuffers;
-
-       /** Flag indicating whether all resources have been released. */
-       private AtomicBoolean isReleased = new AtomicBoolean();
-
-       /** The next buffer to hand out. */
-       @GuardedBy("this")
-       private Buffer nextBuffer;
-
-       /** Flag indicating whether a spill is still in progress. */
-       private volatile boolean isSpillInProgress = true;
-
-       SpilledSubpartitionView(
-               SpillableSubpartition parent,
-               int memorySegmentSize,
-               BufferFileWriter spillWriter,
-               long numberOfSpilledBuffers,
-               BufferAvailabilityListener availabilityListener) throws 
IOException {
-
-               this.parent = checkNotNull(parent);
-               this.bufferPool = new SpillReadBufferPool(2, memorySegmentSize);
-               this.spillWriter = checkNotNull(spillWriter);
-               this.fileReader = new 
SynchronousBufferFileReader(spillWriter.getChannelID(), false);
-               checkArgument(numberOfSpilledBuffers >= 0);
-               this.numberOfSpilledBuffers = numberOfSpilledBuffers;
-               this.availabilityListener = checkNotNull(availabilityListener);
-
-               // Check whether async spilling is still in progress. If not, 
this returns
-               // false and we can notify our availability listener about all 
available buffers.
-               // Otherwise, we notify only when the spill writer callback 
happens.
-               if (!spillWriter.registerAllRequestsProcessedListener(this)) {
-                       isSpillInProgress = false;
-                       availabilityListener.notifyDataAvailable();
-                       LOG.debug("No spilling in progress. Notified about {} 
available buffers.", numberOfSpilledBuffers);
-               } else {
-                       LOG.debug("Spilling in progress. Waiting with 
notification about {} available buffers.", numberOfSpilledBuffers);
-               }
-       }
-
-       /**
-        * This is the call back method for the spill writer. If a spill is 
still
-        * in progress when this view is created we wait until this method is 
called
-        * before we notify the availability listener.
-        */
-       @Override
-       public void onNotification() {
-               isSpillInProgress = false;
-               availabilityListener.notifyDataAvailable();
-               LOG.debug("Finished spilling. Notified about {} available 
buffers.", numberOfSpilledBuffers);
-       }
-
-       @Nullable
-       @Override
-       public BufferAndBacklog getNextBuffer() throws IOException, 
InterruptedException {
-               if (isSpillInProgress) {
-                       return null;
-               }
-
-               Buffer current;
-               boolean nextBufferIsEvent;
-               synchronized (this) {
-                       if (nextBuffer == null) {
-                               current = requestAndFillBuffer();
-                       } else {
-                               current = nextBuffer;
-                       }
-                       nextBuffer = requestAndFillBuffer();
-                       nextBufferIsEvent = nextBuffer != null && 
!nextBuffer.isBuffer();
-               }
-
-               if (current == null) {
-                       return null;
-               }
-
-               int newBacklog = parent.decreaseBuffersInBacklog(current);
-               return new BufferAndBacklog(current, newBacklog > 0 || 
nextBufferIsEvent, newBacklog, nextBufferIsEvent);
-       }
-
-       @Nullable
-       private Buffer requestAndFillBuffer() throws IOException, 
InterruptedException {
-               assert Thread.holdsLock(this);
-
-               if (fileReader.hasReachedEndOfFile()) {
-                       return null;
-               }
-               // TODO This is fragile as we implicitly expect that multiple 
calls to
-               // this method don't happen before recycling buffers returned 
earlier.
-               Buffer buffer = bufferPool.requestBufferBlocking();
-               fileReader.readInto(buffer);
-               return buffer;
-       }
-
-       @Override
-       public void notifyDataAvailable() {
-               // We do the availability listener notification either directly 
on
-               // construction of this view (when everything has been spilled) 
or
-               // as soon as spilling is done and we are notified about it in 
the
-               // #onNotification callback.
-       }
-
-       @Override
-       public void notifySubpartitionConsumed() throws IOException {
-               parent.onConsumedSubpartition();
-       }
-
-       @Override
-       public void releaseAllResources() throws IOException {
-               if (isReleased.compareAndSet(false, true)) {
-                       // TODO This can block until all buffers are written 
out to
-                       // disk if a spill is in-progress before deleting the 
file.
-                       // It is possibly called from the Netty event loop 
threads,
-                       // which can bring down the network.
-                       spillWriter.closeAndDelete();
-
-                       synchronized (this) {
-                               fileReader.close();
-                               if (nextBuffer != null) {
-                                       nextBuffer.recycleBuffer();
-                                       nextBuffer = null;
-                               }
-                       }
-
-                       bufferPool.destroy();
-               }
-       }
-
-       @Override
-       public boolean isReleased() {
-               return parent.isReleased() || isReleased.get();
-       }
-
-       @Override
-       public boolean nextBufferIsEvent() {
-               synchronized (this) {
-                       if (nextBuffer == null) {
-                               try {
-                                       nextBuffer = requestAndFillBuffer();
-                               } catch (Exception e) {
-                                       // we can ignore this here (we will get 
it again once getNextBuffer() is called)
-                                       return false;
-                               }
-                       }
-                       return nextBuffer != null && !nextBuffer.isBuffer();
-               }
-       }
-
-       @Override
-       public synchronized boolean isAvailable() {
-               if (nextBuffer != null) {
-                       return true;
-               }
-               return !fileReader.hasReachedEndOfFile();
-       }
-
-       @Override
-       public Throwable getFailureCause() {
-               return parent.getFailureCause();
-       }
-
-       @Override
-       public String toString() {
-               return String.format("SpilledSubpartitionView(index: %d, 
buffers: %d) of ResultPartition %s",
-                       parent.index,
-                       numberOfSpilledBuffers,
-                       parent.parent.getPartitionId());
-       }
-
-       /**
-        * A buffer pool to provide buffer to read the file into.
-        *
-        * <p>This pool ensures that a consuming input gate makes progress in 
all cases, even when all
-        * buffers of the input gate buffer pool have been requested by remote 
input channels.
-        */
-       private static class SpillReadBufferPool implements BufferRecycler {
-
-               private final Queue<Buffer> buffers;
-
-               private boolean isDestroyed;
-
-               SpillReadBufferPool(int numberOfBuffers, int memorySegmentSize) 
{
-                       this.buffers = new ArrayDeque<>(numberOfBuffers);
-
-                       synchronized (buffers) {
-                               for (int i = 0; i < numberOfBuffers; i++) {
-                                       buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
-                                               memorySegmentSize, null), 
this));
-                               }
-                       }
-               }
-
-               @Override
-               public void recycle(MemorySegment memorySegment) {
-                       synchronized (buffers) {
-                               if (isDestroyed) {
-                                       memorySegment.free();
-                               } else {
-                                       buffers.add(new 
NetworkBuffer(memorySegment, this));
-                                       buffers.notifyAll();
-                               }
-                       }
-               }
-
-               private Buffer requestBufferBlocking() throws 
InterruptedException {
-                       synchronized (buffers) {
-                               while (true) {
-                                       if (isDestroyed) {
-                                               return null;
-                                       }
-
-                                       Buffer buffer = buffers.poll();
-
-                                       if (buffer != null) {
-                                               return buffer;
-                                       }
-                                       // Else: wait for a buffer
-                                       buffers.wait();
-                               }
-                       }
-               }
-
-               private void destroy() {
-                       synchronized (buffers) {
-                               isDestroyed = true;
-                               buffers.notifyAll();
-                       }
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 9706a86..7a68368 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Utility class for create not-pooled {@link BufferBuilder}.
@@ -83,4 +86,30 @@ public class BufferBuilderTestUtils {
                        FreeingBufferRecycler.INSTANCE,
                        false);
        }
+
+       public static Buffer buildBufferWithAscendingInts(int bufferSize, int 
numInts, int nextValue) {
+               final MemorySegment seg = 
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+               for (int i = 0; i < numInts; i++) {
+                       seg.putIntLittleEndian(4 * i, nextValue++);
+               }
+
+               return new NetworkBuffer(seg, MemorySegment::free, true, 4 * 
numInts);
+       }
+
+       public static void validateBufferWithAscendingInts(Buffer buffer, int 
numInts, int nextValue) {
+               final ByteBuffer bb = 
buffer.getNioBufferReadable().order(ByteOrder.LITTLE_ENDIAN);
+
+               for (int i = 0; i < numInts; i++) {
+                       assertEquals(nextValue++, bb.getInt());
+               }
+       }
+
+       public static Buffer buildSomeBuffer() {
+               return buildSomeBuffer(1024);
+       }
+
+       public static Buffer buildSomeBuffer(int size) {
+               final MemorySegment seg = 
MemorySegmentFactory.allocateUnpooledSegment(size);
+               return new NetworkBuffer(seg, MemorySegment::free, true, size);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
new file mode 100644
index 0000000..5051b90
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Behavior tests for the {@link BoundedBlockingSubpartition} and the
+ * {@link BoundedBlockingSubpartitionReader}.
+ *
+ * <p>Full read / write tests for the partition and the reader are in
+ * {@link BoundedBlockingSubpartitionWriteReadTest}.
+ */
+public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
+
+       @ClassRule
+       public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testCreateReaderBeforeFinished() throws Exception {
+               final ResultSubpartition partition = createSubpartition();
+
+               try {
+                       partition.createReadView(new 
NoOpBufferAvailablityListener());
+                       fail("expected exception");
+               }
+               catch (IllegalStateException ignored) {}
+
+               partition.release();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       ResultSubpartition createSubpartition() throws Exception {
+               final ResultPartition resultPartition = 
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+               return new BoundedBlockingSubpartition(0, resultPartition, 
tmpPath());
+       }
+
+       @Override
+       ResultSubpartition createFailingWritesSubpartition() throws Exception {
+               final ResultPartition resultPartition = 
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
+
+               return new BoundedBlockingSubpartition(
+                               0,
+                               resultPartition,
+                               FailingMemory.create());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       static Path tmpPath() throws IOException {
+               return new File(TMP_DIR.newFolder(), "subpartition").toPath();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class FailingMemory extends MemoryMappedBuffers {
+
+               FailingMemory(Path path, FileChannel fc) throws IOException {
+                       super(path, fc, Integer.MAX_VALUE);
+               }
+
+               @Override
+               void writeBuffer(Buffer buffer) throws IOException {
+                       throw new IOException("test");
+               }
+
+               static FailingMemory create() throws IOException {
+                       Path p = tmpPath();
+                       FileChannel fc = FileChannel.open(p, 
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
+                       return new FailingMemory(p, fc);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
new file mode 100644
index 0000000..46dbb05
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ */
+public class BoundedBlockingSubpartitionWriteReadTest {
+
+       @ClassRule
+       public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void testWriteAndReadData() throws Exception {
+               final int numLongs = 15_000_000; // roughly 115 MiBytes
+
+               // setup
+               final BoundedBlockingSubpartition subpartition = 
createAndFillPartition(numLongs);
+
+               // test & check
+               final ResultSubpartitionView reader = 
subpartition.createReadView(() -> {});
+               readLongs(reader, numLongs, subpartition.getBuffersInBacklog());
+
+               // cleanup
+               reader.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test
+       public void testRead10ConsumersSequential() throws Exception {
+               final int numLongs = 10_000_000;
+
+               // setup
+               final BoundedBlockingSubpartition subpartition = 
createAndFillPartition(numLongs);
+
+               // test & check
+               for (int i = 0; i < 10; i++) {
+                       final ResultSubpartitionView reader = 
subpartition.createReadView(() -> {});
+                       readLongs(reader, numLongs, 
subpartition.getBuffersInBacklog());
+                       reader.releaseAllResources();
+               }
+
+               // cleanup
+               subpartition.release();
+       }
+
+       @Test
+       public void testRead10ConsumersConcurrent() throws Exception {
+               final int numLongs = 15_000_000;
+
+               // setup
+               final BoundedBlockingSubpartition subpartition = 
createAndFillPartition(numLongs);
+
+               // test
+               final LongReader[] readerThreads = 
createSubpartitionLongReaders(
+                               subpartition, 10, numLongs, 
subpartition.getBuffersInBacklog());
+               for (CheckedThread t : readerThreads) {
+                       t.start();
+               }
+
+               // check
+               for (CheckedThread t : readerThreads) {
+                       t.sync(); // this propagates assertion errors out from 
the threads
+               }
+
+               // cleanup
+               subpartition.release();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  common test passes
+       // 
------------------------------------------------------------------------
+
+       private static void readLongs(ResultSubpartitionView reader, long 
numLongs, int numBuffers) throws Exception {
+               BufferAndBacklog next;
+               long expectedNextLong = 0L;
+               int nextExpectedBacklog = numBuffers - 1;
+
+               while ((next = reader.getNextBuffer()) != null && 
next.buffer().isBuffer()) {
+                       assertTrue(next.isMoreAvailable());
+                       assertEquals(nextExpectedBacklog, 
next.buffersInBacklog());
+
+                       ByteBuffer buffer = 
next.buffer().getNioBufferReadable();
+                       while (buffer.hasRemaining()) {
+                               assertEquals(expectedNextLong++, 
buffer.getLong());
+                       }
+
+                       nextExpectedBacklog--;
+               }
+
+               assertEquals(numLongs, expectedNextLong);
+               assertEquals(-1, nextExpectedBacklog);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utils
+       // 
------------------------------------------------------------------------
+
+       private static void writeLongs(BoundedBlockingSubpartition partition, 
long nums) throws IOException {
+               final MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(1024 * 1024);
+
+               long l = 0;
+               while (nums > 0) {
+                       int pos = 0;
+                       for (; nums > 0 && pos <= memory.size() - 8; pos += 8) {
+                               memory.putLongBigEndian(pos, l++);
+                               nums--;
+                       }
+
+                       partition.add(new BufferConsumer(memory, (ignored) -> 
{}, pos, true));
+
+                       // we need to flush after every buffer as long as the 
add() contract is that
+                       // buffer are immediately added and can be filled 
further after that (for low latency
+                       // streaming data exchanges)
+                       partition.flush();
+               }
+       }
+
+       private static BoundedBlockingSubpartition createAndFillPartition(long 
numLongs) throws IOException {
+               BoundedBlockingSubpartition subpartition = createSubpartition();
+               writeLongs(subpartition, numLongs);
+               subpartition.finish();
+               return subpartition;
+       }
+
+       private static BoundedBlockingSubpartition createSubpartition() throws 
IOException {
+               return new BoundedBlockingSubpartition(
+                               0,
+                               
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
+                               new File(TMP_FOLDER.newFolder(), 
"partitiondata").toPath());
+       }
+
+       private static LongReader[] createSubpartitionLongReaders(
+                       BoundedBlockingSubpartition subpartition,
+                       int numReaders,
+                       int numLongs,
+                       int numBuffers) throws IOException {
+
+               final LongReader[] readerThreads = new LongReader[numReaders];
+               for (int i = 0; i < numReaders; i++) {
+                       ResultSubpartitionView reader = 
subpartition.createReadView(() -> {});
+                       readerThreads[i] = new LongReader(reader, numLongs, 
numBuffers);
+               }
+               return readerThreads;
+       }
+
+       private static final class LongReader extends CheckedThread {
+
+               private final ResultSubpartitionView reader;
+
+               private final long numLongs;
+
+               private final int numBuffers;
+
+               LongReader(ResultSubpartitionView reader, long numLongs, int 
numBuffers) {
+                       this.reader = reader;
+                       this.numLongs = numLongs;
+                       this.numBuffers = numBuffers;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       readLongs(reader, numLongs, numBuffers);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
new file mode 100644
index 0000000..55fa496
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Reader;
+import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for the {@link BufferToByteBuffer}.
+ */
+public class BufferToByteBufferTest {
+
+       @Test
+       public void testCompleteIsSameBufferAsOriginal() {
+               final ByteBuffer bb = ByteBuffer.allocateDirect(128);
+               final BufferToByteBuffer.Writer writer = new 
BufferToByteBuffer.Writer(bb);
+
+               final ByteBuffer result = writer.complete();
+
+               assertSame(bb, result);
+       }
+
+       @Test
+       public void testWriteReadMatchesCapacity() {
+               final ByteBuffer bb = ByteBuffer.allocateDirect(1200);
+               testWriteAndReadMultipleBuffers(bb, 100);
+       }
+
+       @Test
+       public void testWriteReadWithLeftoverCapacity() {
+               final ByteBuffer bb = ByteBuffer.allocateDirect(1177);
+               testWriteAndReadMultipleBuffers(bb, 100);
+       }
+
+       private void testWriteAndReadMultipleBuffers(ByteBuffer buffer, int 
numIntsPerBuffer) {
+               final Writer writer = new Writer(buffer);
+
+               int numBuffers = 0;
+               while 
(writer.writeBuffer(BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, 
numIntsPerBuffer, 0))) {
+                       numBuffers++;
+               }
+
+               final ByteBuffer bb = writer.complete().slice();
+
+               final Reader reader = new Reader(bb);
+               Buffer buf;
+               while ((buf = reader.sliceNextBuffer()) != null) {
+                       
BufferBuilderTestUtils.validateBufferWithAscendingInts(buf, numIntsPerBuffer, 
0);
+                       numBuffers--;
+               }
+
+               assertEquals(0, numBuffers);
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
new file mode 100644
index 0000000..eec7dba
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import 
org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
+
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ */
+public class MemoryMappedBuffersTest {
+
+       @ClassRule
+       public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void testWriteAndReadData() throws Exception {
+               testWriteAndReadData(10_000_000, Integer.MAX_VALUE);
+       }
+
+       @Test
+       public void testWriteAndReadDataAcrossRegions() throws Exception {
+               testWriteAndReadData(10_000_000, 1_276_347);
+       }
+
+       private static void testWriteAndReadData(int numInts, int regionSize) 
throws Exception {
+               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
+                       final int numBuffers = writeInts(memory, numInts);
+                       memory.finishWrite();
+
+                       readInts(memory.getFullBuffers(), numBuffers, numInts);
+               }
+       }
+
+       @Test
+       public void returnNullAfterEmpty() throws Exception {
+               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.create(createTempPath())) {
+                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer());
+                       memory.finishWrite();
+
+                       final BufferSlicer reader = memory.getFullBuffers();
+                       assertNotNull(reader.sliceNextBuffer());
+
+                       // check that multiple calls now return empty buffers
+                       assertNull(reader.sliceNextBuffer());
+                       assertNull(reader.sliceNextBuffer());
+                       assertNull(reader.sliceNextBuffer());
+               }
+       }
+
+       @Test
+       public void testDeleteFileOnClose() throws Exception {
+               final Path path = createTempPath();
+               final MemoryMappedBuffers mmb = 
MemoryMappedBuffers.create(path);
+               assertTrue(Files.exists(path));
+
+               mmb.close();
+
+               assertFalse(Files.exists(path));
+       }
+
+       @Test
+       public void testGetSizeSingleRegion() throws Exception {
+               testGetSize(Integer.MAX_VALUE);
+       }
+
+       @Test
+       public void testGetSizeMultipleRegions() throws Exception {
+               testGetSize(100_000);
+       }
+
+       private static void testGetSize(int regionSize) throws Exception {
+               final int bufferSize1 = 60_787;
+               final int bufferSize2 = 76_687;
+               final int expectedSize1 = bufferSize1 + 
BufferToByteBuffer.HEADER_LENGTH;
+               final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * 
BufferToByteBuffer.HEADER_LENGTH;
+
+               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
+
+                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1));
+                       assertEquals(expectedSize1, memory.getSize());
+
+                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2));
+                       assertEquals(expectedSizeFinal, memory.getSize());
+
+                       memory.finishWrite();
+                       assertEquals(expectedSizeFinal, memory.getSize());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utils
+       // 
------------------------------------------------------------------------
+
+       private static int writeInts(MemoryMappedBuffers memory, int numInts) 
throws IOException {
+               final int bufferSize = 1024 * 1024; // 1 MiByte
+               final int numIntsInBuffer = bufferSize / 4;
+               int numBuffers = 0;
+
+               for (int nextValue = 0; nextValue < numInts; nextValue += 
numIntsInBuffer) {
+                       Buffer buffer = 
BufferBuilderTestUtils.buildBufferWithAscendingInts(bufferSize, 
numIntsInBuffer, nextValue);
+                       memory.writeBuffer(buffer);
+                       numBuffers++;
+               }
+
+               return numBuffers;
+       }
+
+       private static void readInts(MemoryMappedBuffers.BufferSlicer memory, 
int numBuffersExpected, int numInts) throws IOException {
+               Buffer b;
+               int nextValue = 0;
+               int numBuffers = 0;
+
+               while ((b = memory.sliceNextBuffer()) != null) {
+                       final int numIntsInBuffer = b.getSize() / 4;
+                       
BufferBuilderTestUtils.validateBufferWithAscendingInts(b, numIntsInBuffer, 
nextValue);
+                       nextValue += numIntsInBuffer;
+                       numBuffers++;
+               }
+
+               assertEquals(numBuffersExpected, numBuffers);
+               assertThat(nextValue, Matchers.greaterThanOrEqualTo(numInts));
+       }
+
+       private static Path createTempPath() throws IOException {
+               return new File(TMP_FOLDER.newFolder(), 
"subpartitiondata").toPath();
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 36cd353..ff15b42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
@@ -35,6 +36,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -78,6 +80,13 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                return new PipelinedSubpartition(0, parent);
        }
 
+       @Override
+       ResultSubpartition createFailingWritesSubpartition() throws Exception {
+               // the tests relating to this are currently not supported by 
the PipelinedSubpartition
+               Assume.assumeTrue(false);
+               return null;
+       }
+
        @Test
        public void testIllegalReadViewRequest() throws Exception {
                final PipelinedSubpartition subpartition = createSubpartition();
@@ -280,4 +289,40 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                assertEquals(2, partition.getTotalNumberOfBuffers());
                assertEquals(0, partition.getTotalNumberOfBytes()); // buffer 
data is never consumed
        }
+
+       @Test
+       public void testReleaseParent() throws Exception {
+               final ResultSubpartition partition = createSubpartition();
+               verifyViewReleasedAfterParentRelease(partition);
+       }
+
+       @Test
+       public void testReleaseParentAfterSpilled() throws Exception {
+               final ResultSubpartition partition = createSubpartition();
+               partition.releaseMemory();
+
+               verifyViewReleasedAfterParentRelease(partition);
+       }
+
+       private void verifyViewReleasedAfterParentRelease(ResultSubpartition 
partition) throws Exception {
+               // Add a bufferConsumer
+               BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
+               partition.add(bufferConsumer);
+               partition.finish();
+
+               // Create the view
+               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
+               ResultSubpartitionView view = 
partition.createReadView(listener);
+
+               // The added bufferConsumer and end-of-partition event
+               assertNotNull(view.getNextBuffer());
+               assertNotNull(view.getNextBuffer());
+
+               // Release the parent
+               assertFalse(view.isReleased());
+               partition.release();
+
+               // Verify that parent release is reflected at partition view
+               assertTrue(view.isReleased());
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index c911df7..5846d6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -202,11 +202,6 @@ public class ResultPartitionTest {
        }
 
        @Test
-       public void testReleaseMemoryOnBlockingPartition() throws Exception {
-               testReleaseMemory(ResultPartitionType.BLOCKING);
-       }
-
-       @Test
        public void testReleaseMemoryOnPipelinedPartition() throws Exception {
                testReleaseMemory(ResultPartitionType.PIPELINED);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
deleted file mode 100644
index 71dbc2b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ /dev/null
@@ -1,800 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.fillBufferBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link SpillableSubpartition}.
- */
-public class SpillableSubpartitionTest extends SubpartitionTestBase {
-       private static final int BUFFER_DATA_SIZE = 4096;
-
-       @Rule
-       public ExpectedException exception = ExpectedException.none();
-
-       /** Executor service for concurrent produce/consume tests. */
-       private static final ExecutorService executorService = 
Executors.newCachedThreadPool();
-
-       /** Asynchronous I/O manager. */
-       private static IOManager ioManager;
-
-       @BeforeClass
-       public static void setup() {
-               ioManager = new IOManagerAsync();
-       }
-
-       @AfterClass
-       public static void shutdown() {
-               executorService.shutdownNow();
-               ioManager.shutdown();
-       }
-
-       @Override
-       SpillableSubpartition createSubpartition() {
-               return createSubpartition(ioManager);
-       }
-
-       private static SpillableSubpartition createSubpartition(IOManager 
ioManager) {
-               ResultPartition parent = mock(ResultPartition.class);
-               BufferProvider bufferProvider = mock(BufferProvider.class);
-               when(parent.getBufferProvider()).thenReturn(bufferProvider);
-               when(bufferProvider.getMemorySegmentSize()).thenReturn(32 * 
1024);
-               return new SpillableSubpartition(0, parent, ioManager);
-       }
-
-       /**
-        * Tests a fix for FLINK-2384.
-        *
-        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-2384";>FLINK-2384</a>
-        */
-       @Test
-       public void testConcurrentFinishAndReleaseMemory() throws Exception {
-               // Latches to blocking
-               final CountDownLatch doneLatch = new CountDownLatch(1);
-               final CountDownLatch blockLatch = new CountDownLatch(1);
-
-               // Blocking spill writer (blocks on the close call)
-               AsynchronousBufferFileWriter spillWriter = 
mock(AsynchronousBufferFileWriter.class);
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               blockLatch.countDown();
-                               doneLatch.await();
-                               return null;
-                       }
-               }).when(spillWriter).close();
-
-               // Mock I/O manager returning the blocking spill writer
-               IOManager ioManager = mock(IOManager.class);
-               
when(ioManager.createBufferFileWriter(nullable(FileIOChannel.ID.class)))
-                       .thenReturn(spillWriter);
-
-               // The partition
-               final SpillableSubpartition partition = new 
SpillableSubpartition(
-                       0, mock(ResultPartition.class), ioManager);
-
-               // Spill the partition initially (creates the spill writer)
-               assertEquals(0, partition.releaseMemory());
-
-               ExecutorService executor = Executors.newSingleThreadExecutor();
-
-               // Finish the partition (this blocks because of the mock 
blocking writer)
-               Future<Void> blockingFinish = executor.submit(new 
Callable<Void>() {
-                       @Override
-                       public Void call() throws Exception {
-                               partition.finish();
-                               return null;
-                       }
-               });
-
-               // Ensure that the blocking call has been made
-               blockLatch.await();
-
-               // This call needs to go through. FLINK-2384 discovered a bug, 
in
-               // which the finish call was holding a lock, which was leading 
to a
-               // deadlock when another operation on the partition was 
happening.
-               partition.releaseMemory();
-
-               // Check that the finish call succeeded w/o problems as well to 
avoid
-               // false test successes.
-               doneLatch.countDown();
-               blockingFinish.get();
-       }
-
-       /**
-        * Tests a fix for FLINK-2412.
-        *
-        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-2412";>FLINK-2412</a>
-        */
-       @Test
-       public void testReleasePartitionAndGetNext() throws Exception {
-               // Create partition and add some buffers
-               SpillableSubpartition partition = createSubpartition();
-
-               partition.finish();
-
-               // Create the read view
-               ResultSubpartitionView readView = spy(partition
-                       .createReadView(new NoOpBufferAvailablityListener()));
-
-               // The released state check (of the parent) needs to be 
independent
-               // of the released state of the view.
-               doNothing().when(readView).releaseAllResources();
-
-               // Release the partition, but the view does not notice yet.
-               partition.release();
-
-               assertNull(readView.getNextBuffer());
-       }
-
-       /**
-        * Tests that a spilled partition is correctly read back in via a 
spilled
-        * read view.
-        */
-       @Test
-       public void testConsumeSpilledPartition() throws Exception {
-               SpillableSubpartition partition = createSubpartition();
-
-               BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               BufferConsumer eventBufferConsumer =
-                       EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1));
-               final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-               partition.add(bufferConsumer.copy());
-               partition.add(bufferConsumer.copy());
-               partition.add(eventBufferConsumer);
-               partition.add(bufferConsumer);
-
-               assertEquals(4, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(0, partition.getTotalNumberOfBytes()); // only 
updated when getting/releasing the buffers
-
-               assertFalse(bufferConsumer.isRecycled());
-               assertEquals(4, partition.releaseMemory());
-               // now the bufferConsumer may be freed, depending on the timing 
of the write operation
-               // -> let's do this check at the end of the test (to save some 
time)
-               // still same statistics
-               assertEquals(4, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, 
partition.getTotalNumberOfBytes());
-
-               partition.finish();
-               // + one EndOfPartitionEvent
-               assertEquals(5, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes());
-
-               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
-
-               assertEquals(1, listener.getNumNotifications());
-               assertFalse(reader.nextBufferIsEvent()); // buffer
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
-               assertEquals(2, partition.getBuffersInBacklog());
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               // finally check that the bufferConsumer has been freed after a 
successful (or failed) write
-               final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
-               while (!bufferConsumer.isRecycled() && 
System.currentTimeMillis() < deadline) {
-                       Thread.sleep(1);
-               }
-               assertTrue(bufferConsumer.isRecycled());
-       }
-
-       /**
-        * Tests that a spilled partition is correctly read back in via a 
spilled read view. The
-        * partition went into spilled state before adding buffers and the 
access pattern resembles
-        * the actual use of {@link 
org.apache.flink.runtime.io.network.api.writer.RecordWriter}.
-        */
-       @Test
-       public void testConsumeSpilledPartitionSpilledBeforeAdd() throws 
Exception {
-               SpillableSubpartition partition = createSubpartition();
-               assertEquals(0, partition.releaseMemory()); // <---- SPILL to 
disk
-
-               BufferBuilder[] bufferBuilders = new BufferBuilder[] {
-                       createBufferBuilder(BUFFER_DATA_SIZE),
-                       createBufferBuilder(BUFFER_DATA_SIZE),
-                       createBufferBuilder(BUFFER_DATA_SIZE),
-                       createBufferBuilder(BUFFER_DATA_SIZE)
-               };
-               BufferConsumer[] bufferConsumers = 
Arrays.stream(bufferBuilders).map(
-                       BufferBuilder::createBufferConsumer
-               ).toArray(BufferConsumer[]::new);
-
-               BufferConsumer eventBufferConsumer =
-                       EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1));
-               final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-               // note: only the newest buffer may be unfinished!
-               partition.add(bufferConsumers[0]);
-               fillBufferBuilder(bufferBuilders[0], BUFFER_DATA_SIZE).finish();
-               partition.add(bufferConsumers[1]);
-               fillBufferBuilder(bufferBuilders[1], BUFFER_DATA_SIZE).finish();
-               partition.add(eventBufferConsumer);
-               partition.add(bufferConsumers[2]);
-               bufferBuilders[2].finish(); // remains empty
-               partition.add(bufferConsumers[3]);
-               // last one: partially filled, unfinished
-               fillBufferBuilder(bufferBuilders[3], BUFFER_DATA_SIZE / 2);
-               // finished buffers only:
-               int expectedSize = BUFFER_DATA_SIZE * 2 + eventSize;
-
-               // now the bufferConsumer may be freed, depending on the timing 
of the write operation
-               // -> let's do this check at the end of the test (to save some 
time)
-               // still same statistics
-               assertEquals(5, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(expectedSize, partition.getTotalNumberOfBytes());
-
-               partition.finish();
-               expectedSize += BUFFER_DATA_SIZE / 2; // previously unfinished 
buffer
-               expectedSize += 4; // + one EndOfPartitionEvent
-               assertEquals(6, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(expectedSize, partition.getTotalNumberOfBytes());
-               Arrays.stream(bufferConsumers).forEach(bufferConsumer -> 
assertTrue(bufferConsumer.isRecycled()));
-
-               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               SpilledSubpartitionView reader = (SpilledSubpartitionView) 
partition.createReadView(listener);
-
-               assertEquals(1, listener.getNumNotifications());
-               assertFalse(reader.nextBufferIsEvent()); // full buffer
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
true);
-               assertEquals(2, partition.getBuffersInBacklog());
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, 
true);
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               //close buffer consumers
-               Arrays.stream(bufferConsumers).forEach(bufferConsumer -> 
bufferConsumer.close());
-       }
-
-       /**
-        * Tests that a spilled partition is correctly read back in via a 
spilled
-        * read view.
-        */
-       @Test
-       public void testConsumeSpillablePartitionSpilledDuringConsume() throws 
Exception {
-               SpillableSubpartition partition = createSubpartition();
-
-               BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               BufferConsumer eventBufferConsumer =
-                       EventSerializer.toBufferConsumer(new 
CancelCheckpointMarker(1));
-               final int eventSize = eventBufferConsumer.getWrittenBytes();
-
-               partition.add(bufferConsumer.copy());
-               partition.add(bufferConsumer.copy());
-               partition.add(eventBufferConsumer);
-               partition.add(bufferConsumer);
-               partition.finish();
-
-               assertEquals(5, partition.getTotalNumberOfBuffers());
-               assertEquals(3, partition.getBuffersInBacklog());
-               assertEquals(0, partition.getTotalNumberOfBytes()); // only 
updated when getting/spilling the buffers
-
-               AwaitableBufferAvailablityListener listener = new 
AwaitableBufferAvailablityListener();
-               SpillableSubpartitionView reader = (SpillableSubpartitionView) 
partition.createReadView(listener);
-
-               // Initial notification
-               assertEquals(1, listener.getNumNotifications());
-               assertFalse(bufferConsumer.isRecycled());
-
-               assertFalse(reader.nextBufferIsEvent());
-
-               // first buffer (non-spilled)
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, 
false);
-               assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes()); // only updated when getting/spilling the 
buffers
-               assertEquals(2, partition.getBuffersInBacklog());
-               assertEquals(1, listener.getNumNotifications()); // since 
isMoreAvailable is set to true, no need for notification
-               assertFalse(bufferConsumer.isRecycled());
-
-               // Spill now
-               assertEquals(3, partition.releaseMemory());
-               assertFalse(bufferConsumer.isRecycled()); // still one in the 
reader!
-               // still same statistics:
-               assertEquals(5, partition.getTotalNumberOfBuffers());
-               assertEquals(2, partition.getBuffersInBacklog());
-               // only updated when getting/spilling the buffers but without 
the nextBuffer (kept in memory)
-               assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, 
partition.getTotalNumberOfBytes());
-
-               // wait for successfully spilling all buffers (before that we 
may not access any spilled buffer and cannot rely on isMoreAvailable!)
-               listener.awaitNotifications(2, 30_000);
-               // Spiller finished
-               assertEquals(2, listener.getNumNotifications());
-
-               // after consuming and releasing the next buffer, the 
bufferConsumer may be freed,
-               // depending on the timing of the last write operation
-               // -> retain once so that we can check below
-               Buffer buffer = bufferConsumer.build();
-               buffer.retainBuffer();
-
-               // second buffer (retained in SpillableSubpartition#nextBuffer)
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, 
false);
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer 
statistics
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               bufferConsumer.close(); // recycle the retained buffer from 
above (should be the last reference!)
-
-               // the event (spilled)
-               assertNextEvent(reader, eventSize, 
CancelCheckpointMarker.class, true, 1, false, true);
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
-               assertEquals(1, partition.getBuffersInBacklog());
-
-               // last buffer (spilled)
-               assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               buffer.recycleBuffer();
-               assertTrue(buffer.isRecycled());
-
-               // End of partition
-               assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, 
false, true);
-               assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, 
partition.getTotalNumberOfBytes()); // already updated during spilling
-               assertEquals(0, partition.getBuffersInBacklog());
-
-               // finally check that the bufferConsumer has been freed after a 
successful (or failed) write
-               final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
-               while (!bufferConsumer.isRecycled() && 
System.currentTimeMillis() < deadline) {
-                       Thread.sleep(1);
-               }
-               assertTrue(bufferConsumer.isRecycled());
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
spillable finished partition.
-        */
-       @Test
-       public void testAddOnFinishedSpillablePartition() throws Exception {
-               testAddOnFinishedPartition(false);
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
spilled finished partition.
-        */
-       @Test
-       public void testAddOnFinishedSpilledPartition() throws Exception {
-               testAddOnFinishedPartition(true);
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
finished partition.
-        *
-        * @param spilled
-        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
-        *              spillable).
-        */
-       private void testAddOnFinishedPartition(boolean spilled) throws 
Exception {
-               SpillableSubpartition partition = createSubpartition();
-               if (spilled) {
-                       assertEquals(0, partition.releaseMemory());
-               }
-               partition.finish();
-               // finish adds an EndOfPartitionEvent
-               assertEquals(1, partition.getTotalNumberOfBuffers());
-               // if not spilled, statistics are only updated when consuming 
the buffers
-               assertEquals(spilled ? 4 : 0, 
partition.getTotalNumberOfBytes());
-
-               BufferConsumer buffer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               try {
-                       partition.add(buffer);
-               } finally {
-                       if (!buffer.isRecycled()) {
-                               buffer.close();
-                               Assert.fail("buffer not recycled");
-                       }
-               }
-               // still same statistics
-               assertEquals(1, partition.getTotalNumberOfBuffers());
-               // if not spilled, statistics are only updated when consuming 
the buffers
-               assertEquals(spilled ? 4 : 0, 
partition.getTotalNumberOfBytes());
-       }
-
-       @Test
-       public void testAddOnReleasedSpillablePartition() throws Exception {
-               testAddOnReleasedPartition(false);
-       }
-
-       @Test
-       public void testAddOnReleasedSpilledPartition() throws Exception {
-               testAddOnReleasedPartition(true);
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
released partition.
-        *
-        * @param spilled
-        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
-        *              spillable).
-        */
-       private void testAddOnReleasedPartition(boolean spilled) throws 
Exception {
-               SpillableSubpartition partition = createSubpartition();
-               partition.release();
-               if (spilled) {
-                       assertEquals(0, partition.releaseMemory());
-               }
-
-               BufferConsumer buffer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               boolean bufferRecycled;
-               try {
-                       partition.add(buffer);
-               } finally {
-                       bufferRecycled = buffer.isRecycled();
-                       if (!bufferRecycled) {
-                               buffer.close();
-                       }
-               }
-               if (!bufferRecycled) {
-                       Assert.fail("buffer not recycled");
-               }
-               assertEquals(0, partition.getTotalNumberOfBuffers());
-               assertEquals(0, partition.getTotalNumberOfBytes());
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
spilled partition where adding the
-        * write request fails with an exception.
-        */
-       @Test
-       public void testAddOnSpilledPartitionWithSlowWriter() throws Exception {
-               // simulate slow writer by a no-op write operation
-               IOManager ioManager = new 
IOManagerAsyncWithNoOpBufferFileWriter();
-               SpillableSubpartition partition = createSubpartition(ioManager);
-               assertEquals(0, partition.releaseMemory());
-
-               BufferConsumer buffer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               boolean bufferRecycled;
-               try {
-                       partition.add(buffer);
-               } finally {
-                       ioManager.shutdown();
-                       bufferRecycled = buffer.isRecycled();
-                       if (!bufferRecycled) {
-                               buffer.close();
-                       }
-               }
-               if (bufferRecycled) {
-                       Assert.fail("buffer recycled before the write operation 
completed");
-               }
-               assertEquals(1, partition.getTotalNumberOfBuffers());
-               assertEquals(BUFFER_DATA_SIZE, 
partition.getTotalNumberOfBytes());
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition without a view
-        * but with a writer that does not do any write to check for correct 
buffer recycling.
-        */
-       @Test
-       public void testReleaseOnSpillablePartitionWithoutViewWithSlowWriter() 
throws Exception {
-               testReleaseOnSpillablePartitionWithSlowWriter(false);
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition which has a
-        * view associated with it and a writer that does not do any write to 
check for correct buffer
-        * recycling.
-        */
-       @Test
-       public void testReleaseOnSpillablePartitionWithViewWithSlowWriter() 
throws Exception {
-               testReleaseOnSpillablePartitionWithSlowWriter(true);
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#releaseMemory()} with a spillable 
partition which has a a
-        * writer that does not do any write to check for correct buffer 
recycling.
-        */
-       private void testReleaseOnSpillablePartitionWithSlowWriter(boolean 
createView) throws Exception {
-               // simulate slow writer by a no-op write operation
-               IOManager ioManager = new 
IOManagerAsyncWithNoOpBufferFileWriter();
-               SpillableSubpartition partition = createSubpartition(ioManager);
-
-               BufferConsumer buffer1 = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               BufferConsumer buffer2 = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               try {
-                       // we need two buffers because the view will use one of 
them and not release it
-                       partition.add(buffer1);
-                       partition.add(buffer2);
-                       assertFalse("buffer1 should not be recycled (still in 
the queue)", buffer1.isRecycled());
-                       assertFalse("buffer2 should not be recycled (still in 
the queue)", buffer2.isRecycled());
-                       assertEquals(2, partition.getTotalNumberOfBuffers());
-                       assertEquals(0, partition.getTotalNumberOfBytes()); // 
only updated when buffers are consumed or spilled
-
-                       if (createView) {
-                               // Create a read view
-                               partition.finish();
-                               partition.createReadView(new 
NoOpBufferAvailablityListener());
-                               assertEquals(0, 
partition.getTotalNumberOfBytes()); // only updated when buffers are consumed 
or spilled
-                       }
-
-                       // one instance of the buffers is placed in the view's 
nextBuffer and not released
-                       // (if there is no view, there will be no additional 
EndOfPartitionEvent)
-                       assertEquals(2, partition.releaseMemory());
-                       assertFalse("buffer1 should not be recycled (advertised 
as nextBuffer)", buffer1.isRecycled());
-                       assertFalse("buffer2 should not be recycled (not 
written yet)", buffer2.isRecycled());
-               } finally {
-                       ioManager.shutdown();
-                       if (!buffer1.isRecycled()) {
-                               buffer1.close();
-                       }
-                       if (!buffer2.isRecycled()) {
-                               buffer2.close();
-                       }
-               }
-               // note: a view requires a finished partition which has an 
additional EndOfPartitionEvent
-               assertEquals(2 + (createView ? 1 : 0), 
partition.getTotalNumberOfBuffers());
-               // with a view, one buffer remains in nextBuffer and is not 
counted yet
-               assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : 
BUFFER_DATA_SIZE), partition.getTotalNumberOfBytes());
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#add(BufferConsumer)} with a 
spilled partition where adding the
-        * write request fails with an exception.
-        */
-       @Test
-       public void testAddOnSpilledPartitionWithFailingWriter() throws 
Exception {
-               IOManager ioManager = new 
IOManagerAsyncWithClosedBufferFileWriter();
-               SpillableSubpartition partition = createSubpartition(ioManager);
-               assertEquals(0, partition.releaseMemory());
-
-               exception.expect(IOException.class);
-
-               BufferConsumer buffer = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               boolean bufferRecycled;
-               try {
-                       partition.add(buffer);
-               } finally {
-                       ioManager.shutdown();
-                       bufferRecycled = buffer.isRecycled();
-                       if (!bufferRecycled) {
-                               buffer.close();
-                       }
-               }
-               if (!bufferRecycled) {
-                       Assert.fail("buffer not recycled");
-               }
-               assertEquals(0, partition.getTotalNumberOfBuffers());
-               assertEquals(0, partition.getTotalNumberOfBytes());
-       }
-
-       /**
-        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spillable partition and no
-        * read view attached.
-        */
-       @Test
-       public void testCleanupReleasedSpillablePartitionNoView() throws 
Exception {
-               testCleanupReleasedPartition(false, false);
-       }
-
-       /**
-        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spillable partition and a
-        * read view attached - [FLINK-8371].
-        */
-       @Test
-       public void testCleanupReleasedSpillablePartitionWithView() throws 
Exception {
-               testCleanupReleasedPartition(false, true);
-       }
-
-       /**
-        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spilled partition and no
-        * read view attached.
-        */
-       @Test
-       public void testCleanupReleasedSpilledPartitionNoView() throws 
Exception {
-               testCleanupReleasedPartition(true, false);
-       }
-
-       /**
-        * Tests cleanup of {@link SpillableSubpartition#release()} with a 
spilled partition and a
-        * read view attached.
-        */
-       @Test
-       public void testCleanupReleasedSpilledPartitionWithView() throws 
Exception {
-               testCleanupReleasedPartition(true, true);
-       }
-
-       /**
-        * Tests cleanup of {@link SpillableSubpartition#release()}.
-        *
-        * @param spilled
-        *              whether the partition should be spilled to disk 
(<tt>true</tt>) or not (<tt>false</tt>,
-        *              spillable)
-        * @param createView
-        *              whether the partition should have a view attached to it 
(<tt>true</tt>) or not (<tt>false</tt>)
-        */
-       private void testCleanupReleasedPartition(boolean spilled, boolean 
createView) throws Exception {
-               SpillableSubpartition partition = createSubpartition();
-
-               BufferConsumer buffer1 = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               BufferConsumer buffer2 = 
createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
-               boolean buffer1Recycled;
-               boolean buffer2Recycled;
-               try {
-                       partition.add(buffer1);
-                       partition.add(buffer2);
-                       // create the read view before spilling
-                       // (tests both code paths since this view may then 
contain the spilled view)
-                       ResultSubpartitionView view = null;
-                       if (createView) {
-                               partition.finish();
-                               view = partition.createReadView(new 
NoOpBufferAvailablityListener());
-                       }
-                       if (spilled) {
-                               // note: in case we create a view, one buffer 
will already reside in the view and
-                               //       one EndOfPartitionEvent will be added 
instead (so overall the number of
-                               //       buffers to spill is the same
-                               assertEquals(2, partition.releaseMemory());
-                       }
-
-                       partition.release();
-
-                       assertTrue(partition.isReleased());
-                       if (createView) {
-                               assertTrue(view.isReleased());
-                       }
-                       assertTrue(buffer1.isRecycled());
-               } finally {
-                       buffer1Recycled = buffer1.isRecycled();
-                       if (!buffer1Recycled) {
-                               buffer1.close();
-                       }
-                       buffer2Recycled = buffer2.isRecycled();
-                       if (!buffer2Recycled) {
-                               buffer2.close();
-                       }
-               }
-               if (!buffer1Recycled) {
-                       Assert.fail("buffer 1 not recycled");
-               }
-               if (!buffer2Recycled) {
-                       Assert.fail("buffer 2 not recycled");
-               }
-               // note: in case we create a view, there will be an additional 
EndOfPartitionEvent
-               assertEquals(createView ? 3 : 2, 
partition.getTotalNumberOfBuffers());
-               if (spilled) {
-                       // with a view, one buffer remains in nextBuffer and is 
not counted yet
-                       assertEquals(BUFFER_DATA_SIZE + (createView ? 4 : 
BUFFER_DATA_SIZE),
-                               partition.getTotalNumberOfBytes());
-               } else {
-                       // non-spilled byte statistics are only updated when 
buffers are consumed
-                       assertEquals(0, partition.getTotalNumberOfBytes());
-               }
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} 
spilled bytes and
-        * buffers counting.
-        */
-       @Test
-       public void testSpillFinishedBufferConsumersFull() throws Exception {
-               SpillableSubpartition partition = createSubpartition();
-               BufferBuilder bufferBuilder = 
createBufferBuilder(BUFFER_DATA_SIZE);
-
-               partition.add(bufferBuilder.createBufferConsumer());
-               assertEquals(0, partition.releaseMemory());
-               assertEquals(1, partition.getBuffersInBacklog());
-               // finally fill the buffer with some bytes
-               fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
-               assertEquals(BUFFER_DATA_SIZE, 
partition.spillFinishedBufferConsumers(false));
-               assertEquals(1, partition.getBuffersInBacklog());
-       }
-
-       /**
-        * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} 
spilled bytes and
-        * buffers counting with partially filled buffers.
-        */
-       @Test
-       public void testSpillFinishedBufferConsumersPartial() throws Exception {
-               SpillableSubpartition partition = createSubpartition();
-               BufferBuilder bufferBuilder = 
createBufferBuilder(BUFFER_DATA_SIZE * 2);
-
-               partition.add(bufferBuilder.createBufferConsumer());
-               fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE);
-
-               assertEquals(0, partition.releaseMemory());
-               assertEquals(2, partition.getBuffersInBacklog()); // partial 
one spilled, buffer consumer still enqueued
-               // finally fill the buffer with some bytes
-               fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
-               assertEquals(BUFFER_DATA_SIZE, 
partition.spillFinishedBufferConsumers(false));
-               assertEquals(2, partition.getBuffersInBacklog());
-       }
-
-       /**
-        * An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
-        * {@link #createBufferFileWriter(FileIOChannel.ID)} method.
-        *
-        * <p>These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
-        * write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
-        */
-       private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
-               @Override
-               public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
-                               throws IOException {
-                       BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
-                       bufferFileWriter.close();
-                       return bufferFileWriter;
-               }
-       }
-
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 7c083ad..13cab65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -27,10 +27,8 @@ import org.junit.Test;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Basic subpartition behaviour tests.
@@ -40,11 +38,30 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
        /**
         * Return the subpartition to be tested.
         */
-       abstract ResultSubpartition createSubpartition();
+       abstract ResultSubpartition createSubpartition() throws Exception;
+
+       /**
+        * Return the subpartition to be used for tests where write calls 
should fail.
+        */
+       abstract ResultSubpartition createFailingWritesSubpartition() throws 
Exception;
 
        // 
------------------------------------------------------------------------
 
        @Test
+       public void createReaderAfterDispose() throws Exception {
+               final ResultSubpartition subpartition = createSubpartition();
+               subpartition.release();
+
+               try {
+                       subpartition.createReadView(() -> {});
+                       fail("expected an exception");
+               }
+               catch (IllegalStateException e) {
+                       // expected
+               }
+       }
+
+       @Test
        public void testAddAfterFinish() throws Exception {
                final ResultSubpartition subpartition = createSubpartition();
 
@@ -52,14 +69,14 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                        subpartition.finish();
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
                        assertEquals(0, subpartition.getBuffersInBacklog());
-                       assertEquals(0, subpartition.getTotalNumberOfBytes()); 
// only updated after consuming the buffers
 
                        BufferConsumer bufferConsumer = 
createFilledBufferConsumer(4096, 4096);
 
                        assertFalse(subpartition.add(bufferConsumer));
+                       assertTrue(bufferConsumer.isRecycled());
+
                        assertEquals(1, subpartition.getTotalNumberOfBuffers());
                        assertEquals(0, subpartition.getBuffersInBacklog());
-                       assertEquals(0, subpartition.getTotalNumberOfBytes()); 
// only updated after consuming the buffers
                } finally {
                        if (subpartition != null) {
                                subpartition.release();
@@ -74,16 +91,11 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                try {
                        subpartition.release();
 
-                       assertEquals(0, subpartition.getTotalNumberOfBuffers());
-                       assertEquals(0, subpartition.getBuffersInBacklog());
-                       assertEquals(0, subpartition.getTotalNumberOfBytes());
-
                        BufferConsumer bufferConsumer = 
createFilledBufferConsumer(4096, 4096);
 
                        assertFalse(subpartition.add(bufferConsumer));
-                       assertEquals(0, subpartition.getTotalNumberOfBuffers());
-                       assertEquals(0, subpartition.getBuffersInBacklog());
-                       assertEquals(0, subpartition.getTotalNumberOfBytes());
+                       assertTrue(bufferConsumer.isRecycled());
+
                } finally {
                        if (subpartition != null) {
                                subpartition.release();
@@ -92,38 +104,71 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
        }
 
        @Test
-       public void testReleaseParent() throws Exception {
+       public void testReleasingReaderDoesNotReleasePartition() throws 
Exception {
                final ResultSubpartition partition = createSubpartition();
-               verifyViewReleasedAfterParentRelease(partition);
+               
partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
+               partition.finish();
+
+               final ResultSubpartitionView reader = 
partition.createReadView(new NoOpBufferAvailablityListener());
+
+               assertFalse(partition.isReleased());
+               assertFalse(reader.isReleased());
+
+               reader.releaseAllResources();
+
+               assertTrue(reader.isReleased());
+               assertFalse(partition.isReleased());
+
+               partition.release();
        }
 
        @Test
-       public void testReleaseParentAfterSpilled() throws Exception {
+       public void testReleaseIsIdempotent() throws Exception {
                final ResultSubpartition partition = createSubpartition();
-               partition.releaseMemory();
+               
partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
+               partition.finish();
 
-               verifyViewReleasedAfterParentRelease(partition);
+               partition.release();
+               partition.release();
+               partition.release();
        }
 
-       private void verifyViewReleasedAfterParentRelease(ResultSubpartition 
partition) throws Exception {
-               // Add a bufferConsumer
-               BufferConsumer bufferConsumer = 
createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-               partition.add(bufferConsumer);
+       @Test
+       public void testReadAfterDispose() throws Exception {
+               final ResultSubpartition partition = createSubpartition();
+               
partition.add(createFilledBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE));
                partition.finish();
 
-               // Create the view
-               BufferAvailabilityListener listener = 
mock(BufferAvailabilityListener.class);
-               ResultSubpartitionView view = 
partition.createReadView(listener);
+               final ResultSubpartitionView reader = 
partition.createReadView(new NoOpBufferAvailablityListener());
+               reader.releaseAllResources();
 
-               // The added bufferConsumer and end-of-partition event
-               assertNotNull(view.getNextBuffer());
-               assertNotNull(view.getNextBuffer());
+               // the reader must not throw an exception
+               reader.getNextBuffer();
 
-               // Release the parent
-               assertFalse(view.isReleased());
-               partition.release();
+               // ideally, we want this to be null, but the pipelined 
partition still serves data
+               // after dispose (which is unintuitive, but does not affect 
correctness)
+//             assertNull(reader.getNextBuffer());
+       }
+
+       @Test
+       public void testRecycleBufferAndConsumerOnFailure() throws Exception {
+               final ResultSubpartition subpartition = 
createFailingWritesSubpartition();
+               try {
+                       final BufferConsumer consumer = 
BufferBuilderTestUtils.createFilledBufferConsumer(100);
+
+                       try {
+                               subpartition.add(consumer);
+                               subpartition.flush();
+                               fail("should fail with an exception");
+                       }
+                       catch (Exception ignored) {
+                               // expected
+                       }
 
-               // Verify that parent release is reflected at partition view
-               assertTrue(view.isReleased());
+                       assertTrue(consumer.isRecycled());
+               }
+               finally {
+                       subpartition.release();
+               }
        }
 }

Reply via email to