This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d88882cc6db523a4fe2d70adad99bb66dc5d48c1
Author: Stephan Ewen <[email protected]>
AuthorDate: Sat Jun 8 16:05:24 2019 +0200

    [FLINK-12986] [network] Add different implementations to store/serve the 
data from the BoundedBlockingSubpartition
    
    This PR encapsulates the following steps:
      - introduce the BoundedData interface to abstract the way that buffers 
are stored / read as part of a bounded stream subpartition
      - Change BufferToByteBuffer from reader/writer to encoder util that works 
across byte buffers and byte channels
        so we can use it for both file-based implementations and memory-buffer 
based implementations
      - Add FileChannelBoundedData which writes the buffers to a file and reads 
them back from the file
      - Add FileChannelMemoryMappedBoundedData which writes the buffer to a 
file and maps that file into memory for reading
      - Add common tests for all implementations of BoundedData
      - Rename MemoryMappedBuffers to MemoryMappedBoundedData to be consistent 
with other names.
      - Instantiate new bounded data store implementations from ResultPartition
      - Use FILE_MMAP implementation on 64bit systems and FILE implementation 
on other systems
---
 .../org/apache/flink/util/MemoryArchitecture.java  |  77 +++++++
 .../apache/flink/util/MemoryArchitectureTest.java  |  36 +++
 .../io/network/buffer/BufferPoolFactory.java       |   5 +
 .../io/network/buffer/NetworkBufferPool.java       |   5 +
 .../partition/BoundedBlockingSubpartition.java     |  73 +++---
 .../BoundedBlockingSubpartitionReader.java         |  22 +-
 .../partition/BoundedBlockingSubpartitionType.java |  75 +++++++
 .../runtime/io/network/partition/BoundedData.java  |  80 +++++++
 .../network/partition/BufferReaderWriterUtil.java  | 250 +++++++++++++++++++++
 .../io/network/partition/BufferToByteBuffer.java   | 130 -----------
 .../network/partition/FileChannelBoundedData.java  | 160 +++++++++++++
 .../FileChannelMemoryMappedBoundedData.java        | 210 +++++++++++++++++
 ...edBuffers.java => MemoryMappedBoundedData.java} | 100 +++++----
 .../network/partition/ResultPartitionFactory.java  |  33 ++-
 .../partition/BoundedBlockingSubpartitionTest.java |  66 ++++--
 .../BoundedBlockingSubpartitionWriteReadTest.java  |  40 +++-
 ...edBuffersTest.java => BoundedDataTestBase.java} | 109 +++++----
 .../partition/BufferReaderWriterUtilTest.java      | 203 +++++++++++++++++
 .../network/partition/BufferToByteBufferTest.java  |  80 -------
 .../partition/FileChannelBoundedDataTest.java      |  40 ++++
 .../FileChannelMemoryMappedBoundedDataTest.java    |  38 ++++
 .../partition/MemoryMappedBoundedDataTest.java     |  38 ++++
 22 files changed, 1505 insertions(+), 365 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java 
b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java
new file mode 100755
index 0000000..3672921
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The memory architecture (32 bit / 64 bit) of the current process.
+ * Note that this might be different than the actual operating system's 
architecture, for example
+ * when installing a 32 bit JRE in a 64 bit OS.
+ */
+public enum MemoryArchitecture {
+
+       // constants here start with an underscore because Java identifier 
cannot start with a
+       // numeric character and alternatives like 'BIT_64' are not as readable
+
+       /**
+        * 32 bit memory address size.
+        */
+       _32_BIT,
+
+       /**
+        * 64 bit memory address size.
+        */
+       _64_BIT,
+
+       /**
+        * Unknown architecture, could not be determined.
+        */
+       UNKNOWN;
+
+       // 
------------------------------------------------------------------------
+
+       private static final MemoryArchitecture current = getInternal();
+
+       /**
+        * Gets the processor architecture of this process.
+        */
+       public static MemoryArchitecture get() {
+               return current;
+       }
+
+       private static MemoryArchitecture getInternal() {
+               // putting these into the method to avoid having objects on the 
heap that are not needed
+               // any more after initialization
+               final List<String> names64bit = Arrays.asList("amd64", 
"x86_64");
+               final List<String> names32bit = Arrays.asList("x86", "i386", 
"i486", "i586", "i686");
+               final String arch = System.getProperty("os.arch");
+
+               if (names64bit.contains(arch)) {
+                       return _64_BIT;
+               }
+               else if (names32bit.contains(arch)) {
+                       return _32_BIT;
+               }
+               else {
+                       return UNKNOWN;
+               }
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java 
b/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java
new file mode 100755
index 0000000..92c829f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Tests for the {@link MemoryArchitecture}.
+ */
+public class MemoryArchitectureTest {
+
+       @Test
+       public void testArchitectureNotUnknown() {
+               final MemoryArchitecture arch = MemoryArchitecture.get();
+
+               assertNotEquals(MemoryArchitecture.UNKNOWN, arch);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
old mode 100644
new mode 100755
index de4c8e09..2b9ce97
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -59,4 +59,9 @@ public interface BufferPoolFactory {
         */
        void destroyBufferPool(BufferPool bufferPool) throws IOException;
 
+       /**
+        * Gets the size of the buffers in the buffer pools produced by this 
factory.
+        */
+       int getBufferSize();
+
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
old mode 100644
new mode 100755
index 51c6aa1..0247ab7
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -121,6 +121,11 @@ public class NetworkBufferPool implements 
BufferPoolFactory, MemorySegmentProvid
                                allocatedMb, availableMemorySegments.size(), 
segmentSize);
        }
 
+       @Override
+       public int getBufferSize() {
+               return memorySegmentSize;
+       }
+
        @Nullable
        public MemorySegment requestMemorySegment() {
                return availableMemorySegments.poll();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
old mode 100644
new mode 100755
index 756b0af..7a74872
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -28,8 +27,8 @@ import org.apache.flink.util.FlinkRuntimeException;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -41,9 +40,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  * in a blocking manner: The result is first produced, then consumed.
  * The result can be consumed possibly multiple times.
  *
- * <p>The implementation creates a temporary memory mapped file and writes all 
buffers to that
- * memory and serves the result from that memory. The kernel backs the mapped 
memory region
- * with physical memory and file space incrementally as new pages are filled.
+ * <p>Depending on the supplied implementation of {@link BoundedData}, the 
actual data is stored
+ * for example in a file, or in a temporary memory mapped file.
  *
  * <h2>Important Notes on Thread Safety</h2>
  *
@@ -57,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>The implementation supports multiple concurrent readers, but assumes a 
single
  * thread per reader. That same thread must also release the reader. In 
particular, after the reader
  * was released, no buffers obtained from this reader may be accessed any 
more, or segmentation
- * faults might occur.
+ * faults might occur in some implementations.
  *
  * <p>The method calls to create readers, dispose readers, and dispose the 
partition are
  * thread-safe vis-a-vis each other.
@@ -71,8 +69,8 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
        @Nullable
        private BufferConsumer currentBuffer;
 
-       /** The memory that we store the data in, via a memory mapped file. */
-       private final MemoryMappedBuffers data;
+       /** The bounded data store that we store the data in. */
+       private final BoundedData data;
 
        /** All created and not yet released readers. */
        @GuardedBy("lock")
@@ -90,25 +88,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
        /** Flag indicating whether the subpartition has been released. */
        private boolean isReleased;
 
-       /**
-        * Common constructor.
-        */
        public BoundedBlockingSubpartition(
                        int index,
                        ResultPartition parent,
-                       Path filePath) throws IOException {
-
-               this(index, parent, MemoryMappedBuffers.create(filePath));
-       }
-
-       /**
-        * Constructor for testing, to pass in custom MemoryMappedBuffers.
-        */
-       @VisibleForTesting
-       BoundedBlockingSubpartition(
-                       int index,
-                       ResultPartition parent,
-                       MemoryMappedBuffers data) throws IOException {
+                       BoundedData data) {
 
                super(index, parent);
 
@@ -215,7 +198,7 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
 
                        availability.notifyDataAvailable();
 
-                       final MemoryMappedBuffers.BufferSlicer dataReader = 
data.getFullBuffers();
+                       final BoundedData.Reader dataReader = 
data.createReader();
                        final BoundedBlockingSubpartitionReader reader = new 
BoundedBlockingSubpartitionReader(
                                        this, dataReader, 
numDataBuffersWritten);
                        readers.add(reader);
@@ -271,4 +254,44 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
        int getBuffersInBacklog() {
                return numDataBuffersWritten;
        }
+
+       // ---------------------------- factories 
--------------------------------
+
+       /**
+        * Creates a BoundedBlockingSubpartition that simply stores the 
partition data in a file.
+        * Data is eagerly spilled (written to disk) and readers directly read 
from the file.
+        */
+       public static BoundedBlockingSubpartition createWithFileChannel(
+                       int index, ResultPartition parent, File tempFile, int 
readBufferSize) throws IOException {
+
+               final FileChannelBoundedData bd = 
FileChannelBoundedData.create(tempFile.toPath(), readBufferSize);
+               return new BoundedBlockingSubpartition(index, parent, bd);
+       }
+
+       /**
+        * Creates a BoundedBlockingSubpartition that stores the partition data 
in memory mapped file.
+        * Data is written to and read from the mapped memory region. Disk 
spilling happens lazily, when the
+        * OS swaps out the pages from the memory mapped file.
+        */
+       public static BoundedBlockingSubpartition createWithMemoryMappedFile(
+                       int index, ResultPartition parent, File tempFile) 
throws IOException {
+
+               final MemoryMappedBoundedData bd = 
MemoryMappedBoundedData.create(tempFile.toPath());
+               return new BoundedBlockingSubpartition(index, parent, bd);
+
+       }
+
+       /**
+        * Creates a BoundedBlockingSubpartition that stores the partition data 
in a file and
+        * memory maps that file for reading.
+        * Data is eagerly spilled (written to disk) and then mapped into 
memory. The main
+        * difference to the {@link #createWithMemoryMappedFile(int, 
ResultPartition, File)} variant
+        * is that no I/O is necessary when pages from the memory mapped file 
are evicted.
+        */
+       public static BoundedBlockingSubpartition 
createWithFileAndMemoryMappedReader(
+                       int index, ResultPartition parent, File tempFile) 
throws IOException {
+
+               final FileChannelMemoryMappedBoundedData bd = 
FileChannelMemoryMappedBoundedData.create(tempFile.toPath());
+               return new BoundedBlockingSubpartition(index, parent, bd);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
old mode 100644
new mode 100755
index 07a999d..f7536b9
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import 
org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
 
 import javax.annotation.Nullable;
 
@@ -44,7 +44,7 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
        /** The reader/decoder to the memory mapped region with the data we 
currently read from.
         * Null once the reader empty or disposed.*/
        @Nullable
-       private BufferSlicer data;
+       private BoundedData.Reader dataReader;
 
        /** The remaining number of data buffers (not events) in the result. */
        private int dataBufferBacklog;
@@ -57,16 +57,16 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
         */
        BoundedBlockingSubpartitionReader(
                        BoundedBlockingSubpartition parent,
-                       BufferSlicer data,
-                       int numDataBuffers) {
+                       BoundedData.Reader dataReader,
+                       int numDataBuffers) throws IOException {
 
                checkArgument(numDataBuffers >= 0);
 
                this.parent = checkNotNull(parent);
-               this.data = checkNotNull(data);
+               this.dataReader = checkNotNull(dataReader);
                this.dataBufferBacklog = numDataBuffers;
 
-               this.nextBuffer = data.sliceNextBuffer();
+               this.nextBuffer = dataReader.nextBuffer();
        }
 
        @Nullable
@@ -83,8 +83,8 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
                        dataBufferBacklog--;
                }
 
-               assert data != null;
-               nextBuffer = data.sliceNextBuffer();
+               assert dataReader != null;
+               nextBuffer = dataReader.nextBuffer();
 
                return BufferAndBacklog.fromBufferAndLookahead(current, 
nextBuffer, dataBufferBacklog);
        }
@@ -104,9 +104,11 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
                // it is not a problem if this method executes multiple times
                isReleased = true;
 
-               // nulling these fields means thet read method and will fail 
fast
+               IOUtils.closeQuietly(dataReader);
+
+               // nulling these fields means the read method and will fail fast
                nextBuffer = null;
-               data = null;
+               dataReader = null;
 
                // Notify the parent that this one is released. This allows the 
parent to
                // eventually release all resources (when all readers are done 
and the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
new file mode 100644
index 0000000..9b43264
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * The type of the BoundedBlockingSubpartition. Also doubles as the factory.
+ */
+public enum BoundedBlockingSubpartitionType {
+
+       /**
+        * A BoundedBlockingSubpartition type that simply stores the partition 
data in a file.
+        * Data is eagerly spilled (written to disk) and readers directly read 
from the file.
+        */
+       FILE {
+
+               @Override
+               public BoundedBlockingSubpartition create(int index, 
ResultPartition parent, File tempFile, int readBufferSize) throws IOException {
+                       return 
BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, 
readBufferSize);
+               }
+       },
+
+       /**
+        * A BoundedBlockingSubpartition type that stores the partition data in 
memory mapped file.
+        * Data is written to and read from the mapped memory region.
+        * Disk spilling happens lazily, when the OS swaps out the pages from 
the memory mapped file.
+        */
+       MMAP {
+
+               @Override
+               public BoundedBlockingSubpartition create(int index, 
ResultPartition parent, File tempFile, int readBufferSize) throws IOException {
+                       return 
BoundedBlockingSubpartition.createWithMemoryMappedFile(index, parent, tempFile);
+               }
+       },
+
+       /**
+        * Creates a BoundedBlockingSubpartition that stores the partition data 
in a file and
+        * memory maps that file for reading.
+        * Data is eagerly spilled (written to disk) and then mapped into 
memory. The main
+        * difference to the {@link BoundedBlockingSubpartitionType#MMAP} 
variant
+        * is that no I/O is necessary when pages from the memory mapped file 
are evicted.
+        */
+       FILE_MMAP {
+
+               @Override
+               public BoundedBlockingSubpartition create(int index, 
ResultPartition parent, File tempFile, int readBufferSize) throws IOException {
+                       return 
BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(index, parent, 
tempFile);
+               }
+       };
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates BoundedBlockingSubpartition of this type.
+        */
+       public abstract BoundedBlockingSubpartition create(int index, 
ResultPartition parent, File tempFile, int readBufferSize) throws IOException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
new file mode 100755
index 0000000..4d58cf8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * BoundedData is the data store in a single bounded blocking subpartition.
+ *
+ * <h2>Life cycle</h2>
+ *
+ * <p>The BoundedData is first created during the "write phase" by writing a 
sequence of buffers
+ * through the {@link #writeBuffer(Buffer)} method.
+ * The write phase is ended by calling {@link #finishWrite()}.
+ * After the write phase is finished, the data can be read multiple times 
through readers created
+ * via {@link #createReader()}.
+ * Finally, the BoundedData is dropped / deleted by calling {@link #close()}.
+ *
+ * <h2>Thread Safety and Concurrency</h2>
+ *
+ * <p>The implementations generally make no assumptions about thread safety.
+ * The only contract is that multiple created readers must be able to work 
independently concurrently.
+ */
+interface BoundedData extends Closeable {
+
+       /**
+        * Writes this buffer to the bounded data.
+        * This call fails if the writing phase was already finished via {@link 
#finishWrite()}.
+        */
+       void writeBuffer(Buffer buffer) throws IOException;
+
+       /**
+        * Finishes the current region and prevents further writes.
+        * After calling this method, further calls to {@link 
#writeBuffer(Buffer)} will fail.
+        */
+       void finishWrite() throws IOException;
+
+       /**
+        * Gets a reader for the bounded data. Multiple readers may be created.
+        * This call only succeeds once the write phase was finished via {@link 
#finishWrite()}.
+        */
+       BoundedData.Reader createReader() throws IOException;
+
+       /**
+        * Gets the number of bytes of all written data (including the metadata 
in the buffer headers).
+        */
+       long getSize();
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A reader to the bounded data.
+        */
+       interface Reader extends Closeable {
+
+               @Nullable
+               Buffer nextBuffer() throws IOException;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
new file mode 100755
index 0000000..445df55
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+/**
+ * Putting and getting of a sequence of buffers to/from a FileChannel or a 
ByteBuffer.
+ * This class handles the headers, length encoding, memory slicing.
+ *
+ * <p>The encoding is the same across FileChannel and ByteBuffer, so this 
class can
+ * write to a file and read from the byte buffer that results from mapping 
this file to memory.
+ */
+final class BufferReaderWriterUtil {
+
+       static final int HEADER_LENGTH = 8;
+
+       static final int HEADER_VALUE_IS_BUFFER = 0;
+
+       static final int HEADER_VALUE_IS_EVENT = 1;
+
+       // 
------------------------------------------------------------------------
+       //  ByteBuffer read / write
+       // 
------------------------------------------------------------------------
+
+       static boolean writeBuffer(Buffer buffer, ByteBuffer memory) {
+               final int bufferSize = buffer.getSize();
+
+               if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+                       return false;
+               }
+
+               memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : 
HEADER_VALUE_IS_EVENT);
+               memory.putInt(bufferSize);
+               memory.put(buffer.getNioBufferReadable());
+               return true;
+       }
+
+       @Nullable
+       static Buffer sliceNextBuffer(ByteBuffer memory) {
+               final int remaining = memory.remaining();
+
+               // we only check the correct case where data is exhausted
+               // all other cases can only occur if our write logic is wrong 
and will already throw
+               // buffer underflow exceptions which will cause the read to 
fail.
+               if (remaining == 0) {
+                       return null;
+               }
+
+               final int header = memory.getInt();
+               final int size = memory.getInt();
+
+               memory.limit(memory.position() + size);
+               ByteBuffer buf = memory.slice();
+               memory.position(memory.limit());
+               memory.limit(memory.capacity());
+
+               MemorySegment memorySegment = 
MemorySegmentFactory.wrapOffHeapMemory(buf);
+
+               return bufferFromMemorySegment(
+                               memorySegment,
+                               FreeingBufferRecycler.INSTANCE,
+                               size,
+                               header == HEADER_VALUE_IS_EVENT);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ByteChannel read / write
+       // 
------------------------------------------------------------------------
+
+       static long writeToByteChannel(
+                       FileChannel channel,
+                       Buffer buffer,
+                       ByteBuffer[] arrayWithHeaderBuffer) throws IOException {
+
+               final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0];
+               headerBuffer.clear();
+               headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER 
: HEADER_VALUE_IS_EVENT);
+               headerBuffer.putInt(buffer.getSize());
+               headerBuffer.flip();
+
+               final ByteBuffer dataBuffer = buffer.getNioBufferReadable();
+               arrayWithHeaderBuffer[1] = dataBuffer;
+
+               final long bytesExpected = HEADER_LENGTH + 
dataBuffer.remaining();
+
+               // The file channel implementation guarantees that all bytes 
are written when invoked
+               // because it is a blocking channel (the implementation 
mentioned it as guaranteed).
+               // However, the api docs leaves it somewhat open, so it seems 
to be an undocumented contract in the JRE.
+               // We build this safety net to be on the safe side.
+               if (bytesExpected < channel.write(arrayWithHeaderBuffer)) {
+                       writeBuffers(channel, arrayWithHeaderBuffer);
+               }
+               return bytesExpected;
+       }
+
+       static long writeToByteChannelIfBelowSize(
+                       FileChannel channel,
+                       Buffer buffer,
+                       ByteBuffer[] arrayWithHeaderBuffer,
+                       long bytesLeft) throws IOException {
+
+               if (bytesLeft >= HEADER_LENGTH + buffer.getSize()) {
+                       return writeToByteChannel(channel, buffer, 
arrayWithHeaderBuffer);
+               }
+
+               return -1L;
+       }
+
+       @Nullable
+       static Buffer readFromByteChannel(
+                       FileChannel channel,
+                       ByteBuffer headerBuffer,
+                       MemorySegment memorySegment,
+                       BufferRecycler bufferRecycler) throws IOException {
+
+               headerBuffer.clear();
+               if (!tryReadByteBuffer(channel, headerBuffer)) {
+                       return null;
+               }
+               headerBuffer.flip();
+
+               final ByteBuffer targetBuf;
+               final int header;
+               final int size;
+
+               try {
+                       header = headerBuffer.getInt();
+                       size = headerBuffer.getInt();
+                       targetBuf = memorySegment.wrap(0, size);
+               }
+               catch (BufferUnderflowException | IllegalArgumentException e) {
+                       // buffer underflow if header buffer is undersized
+                       // IllegalArgumentException if size is outside memory 
segment size
+                       throwCorruptDataException();
+                       return null; // silence compiler
+               }
+
+               readByteBufferFully(channel, targetBuf);
+
+               return bufferFromMemorySegment(memorySegment, bufferRecycler, 
size, header == HEADER_VALUE_IS_EVENT);
+       }
+
+       static ByteBuffer allocatedHeaderBuffer() {
+               ByteBuffer bb = ByteBuffer.allocateDirect(HEADER_LENGTH);
+               configureByteBuffer(bb);
+               return bb;
+       }
+
+       static ByteBuffer[] allocatedWriteBufferArray() {
+               return new ByteBuffer[] { allocatedHeaderBuffer(), null };
+       }
+
+       private static boolean tryReadByteBuffer(FileChannel channel, 
ByteBuffer b) throws IOException {
+               if (channel.read(b) == -1) {
+                       return false;
+               }
+               else {
+                       while (b.hasRemaining()) {
+                               if (channel.read(b) == -1) {
+                                       throwPrematureEndOfFile();
+                               }
+                       }
+                       return true;
+               }
+       }
+
+       private static void readByteBufferFully(FileChannel channel, ByteBuffer 
b) throws IOException {
+               // the post-checked loop here gets away with one less check in 
the normal case
+               do {
+                       if (channel.read(b) == -1) {
+                               throwPrematureEndOfFile();
+                       }
+               }
+               while (b.hasRemaining());
+       }
+
+       private static void writeBuffer(FileChannel channel, ByteBuffer buffer) 
throws IOException {
+               while (buffer.hasRemaining()) {
+                       channel.write(buffer);
+               }
+       }
+
+       private static void writeBuffers(FileChannel channel, ByteBuffer... 
buffers) throws IOException {
+               for (ByteBuffer buffer : buffers) {
+                       writeBuffer(channel, buffer);
+               }
+       }
+
+       private static void throwPrematureEndOfFile() throws IOException {
+               throw new IOException("The spill file is corrupt: premature end 
of file");
+       }
+
+       private static void throwCorruptDataException() throws IOException {
+               throw new IOException("The spill file is corrupt: buffer size 
and boundaries invalid");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utils
+       // 
------------------------------------------------------------------------
+
+       static Buffer bufferFromMemorySegment(
+                       MemorySegment memorySegment,
+                       BufferRecycler memorySegmentRecycler,
+                       int size,
+                       boolean isEvent) {
+
+               final Buffer buffer = new NetworkBuffer(memorySegment, 
memorySegmentRecycler);
+               buffer.setSize(size);
+
+               if (isEvent) {
+                       buffer.tagAsEvent();
+               }
+
+               return buffer;
+       }
+
+       static void configureByteBuffer(ByteBuffer buffer) {
+               buffer.order(ByteOrder.nativeOrder());
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
deleted file mode 100644
index 4cdf41a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
-
-import javax.annotation.Nullable;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * Putting and getting of a sequence of buffers to/from a ByteBuffer.
- * This class handles the headers, length encoding, memory slicing.
- */
-final class BufferToByteBuffer {
-
-       // all fields and methods below here have package-private access to 
avoid bridge
-       // methods when accessing them from the nested classes
-
-       static final int HEADER_LENGTH = 8;
-
-       static final int HEADER_VALUE_IS_BUFFER = 0;
-
-       static final int HEADER_VALUE_IS_EVENT = 1;
-
-       static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) {
-               checkArgument(buffer.position() == 0);
-               checkArgument(buffer.capacity() > 8);
-               checkArgument(buffer.limit() == buffer.capacity());
-
-               return buffer.order(ByteOrder.nativeOrder());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       static final class Writer {
-
-               private final ByteBuffer memory;
-
-               Writer(ByteBuffer memory) {
-                       this.memory = checkAndConfigureByteBuffer(memory);
-               }
-
-               public boolean writeBuffer(Buffer buffer) {
-                       final ByteBuffer memory = this.memory;
-                       final int bufferSize = buffer.getSize();
-
-                       if (memory.remaining() < bufferSize + HEADER_LENGTH) {
-                               return false;
-                       }
-
-                       memory.putInt(buffer.isBuffer() ? 
HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
-                       memory.putInt(bufferSize);
-                       memory.put(buffer.getNioBufferReadable());
-                       return true;
-               }
-
-               public ByteBuffer complete() {
-                       memory.flip();
-                       return memory;
-               }
-
-               public int getNumBytes() {
-                       return memory.position();
-               }
-       }
-
-       static final class Reader {
-
-               private final ByteBuffer memory;
-
-               Reader(ByteBuffer memory) {
-                       this.memory = checkAndConfigureByteBuffer(memory);
-               }
-
-               @Nullable
-               public Buffer sliceNextBuffer() {
-                       final ByteBuffer memory = this.memory;
-                       final int remaining = memory.remaining();
-
-                       // we only check the correct case where data is 
exhausted
-                       // all other cases can only occur if our write logic is 
wrong and will already throw
-                       // buffer underflow exceptions which will cause the 
read to fail.
-                       if (remaining == 0) {
-                               return null;
-                       }
-
-                       final int header = memory.getInt();
-                       final int size = memory.getInt();
-
-                       memory.limit(memory.position() + size);
-                       ByteBuffer buf = memory.slice();
-                       memory.position(memory.limit());
-                       memory.limit(memory.capacity());
-
-                       MemorySegment memorySegment = 
MemorySegmentFactory.wrapOffHeapMemory(buf);
-                       Buffer buffer = new NetworkBuffer(memorySegment, 
FreeingBufferRecycler.INSTANCE);
-                       buffer.setSize(size);
-
-                       if (header == HEADER_VALUE_IS_EVENT) {
-                               buffer.tagAsEvent();
-                       }
-
-                       return buffer;
-               }
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
new file mode 100644
index 0000000..50dca60
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link BoundedData} that writes directly into a File 
Channel.
+ * The readers are simple file channel readers using a simple dedicated buffer 
pool.
+ */
+final class FileChannelBoundedData implements BoundedData {
+
+       private final Path filePath;
+
+       private final FileChannel fileChannel;
+
+       private final ByteBuffer[] headerAndBufferArray;
+
+       private long size;
+
+       private final int memorySegmentSize;
+
+       FileChannelBoundedData(
+                       Path filePath,
+                       FileChannel fileChannel,
+                       int memorySegmentSize) {
+
+               this.filePath = checkNotNull(filePath);
+               this.fileChannel = checkNotNull(fileChannel);
+               this.memorySegmentSize = memorySegmentSize;
+               this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+       }
+
+       @Override
+       public void writeBuffer(Buffer buffer) throws IOException {
+               size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, 
buffer, headerAndBufferArray);
+       }
+
+       @Override
+       public void finishWrite() throws IOException {
+               fileChannel.close();
+       }
+
+       @Override
+       public Reader createReader() throws IOException {
+               checkState(!fileChannel.isOpen());
+
+               final FileChannel fc = FileChannel.open(filePath, 
StandardOpenOption.READ);
+               return new FileBufferReader(fc, memorySegmentSize);
+       }
+
+       @Override
+       public long getSize() {
+               return size;
+       }
+
+       @Override
+       public void close() throws IOException {
+               IOUtils.closeQuietly(fileChannel);
+               Files.delete(filePath);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static FileChannelBoundedData create(Path filePath, int 
memorySegmentSize) throws IOException {
+               final FileChannel fileChannel = FileChannel.open(
+                               filePath, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.WRITE);
+
+               return new FileChannelBoundedData(
+                               filePath,
+                               fileChannel,
+                               memorySegmentSize);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       static final class FileBufferReader implements BoundedData.Reader, 
BufferRecycler {
+
+               private static final int NUM_BUFFERS = 2;
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private final ArrayDeque<MemorySegment> buffers;
+
+               FileBufferReader(FileChannel fileChannel, int bufferSize) {
+                       this.fileChannel = checkNotNull(fileChannel);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+                       this.buffers = new ArrayDeque<>(NUM_BUFFERS);
+
+                       for (int i = 0; i < NUM_BUFFERS; i++) {
+                               
buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, 
null));
+                       }
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       final MemorySegment memory = buffers.pollFirst();
+
+                       if (memory != null) {
+                               final Buffer next = 
BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, 
this);
+                               if (next != null) {
+                                       return next;
+                               }
+                               else {
+                                       recycle(memory);
+                                       return null;
+                               }
+                       }
+
+                       throw new IOException("Bug in 
BoundedBlockingSubpartition with FILE data: " +
+                                       "Requesting new buffer before previous 
buffer returned.");
+               }
+
+               @Override
+               public void close() throws IOException {
+                       fileChannel.close();
+               }
+
+               @Override
+               public void recycle(MemorySegment memorySegment) {
+                       buffers.addLast(memorySegment);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
new file mode 100755
index 0000000..4a71fcd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link BoundedData} that writes directly into a File 
Channel
+ * and maps the file into memory after writing. Readers simply access the 
memory mapped
+ * data. All readers access the same memory, which is mapped in a read-only 
manner.
+ *
+ * <p>Similarly as the {@link MemoryMappedBoundedData}, this implementation 
needs to work around
+ * the fact that the memory mapped regions cannot exceed 2GB in Java. While 
the implementation writes
+ * to the a single file, the result may be multiple memory mapped buffers.
+ *
+ * <h2>Important!</h2>
+ *
+ * <p>This class performs absolutely no synchronization and relies on single 
threaded access
+ * or externally synchronized access. Concurrent access around disposal may 
cause
+ * segmentation faults!
+ */
+final class FileChannelMemoryMappedBoundedData implements BoundedData {
+
+       /** The file channel backing the memory mapped file. */
+       private final FileChannel fileChannel;
+
+       /** The reusable array with header buffer and data buffer, to use 
gathering writes on the
+        * file channel ({@link 
java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */
+       private final ByteBuffer[] headerAndBufferArray;
+
+       /** All memory mapped regions. */
+       private final ArrayList<ByteBuffer> memoryMappedRegions;
+
+       /** The path of the memory mapped file. */
+       private final Path filePath;
+
+       /** The position in the file channel. Cached for efficiency, because an 
actual position
+        * lookup in the channel involves various locks and checks. */
+       private long pos;
+
+       /** The position where the current memory mapped region must end. */
+       private long endOfCurrentRegion;
+
+       /** The position where the current memory mapped started. */
+       private long startOfCurrentRegion;
+
+       /** The maximum size of each mapped region. */
+       private final long maxRegionSize;
+
+       FileChannelMemoryMappedBoundedData(
+                       Path filePath,
+                       FileChannel fileChannel,
+                       int maxSizePerMappedRegion) {
+
+               this.filePath = filePath;
+               this.fileChannel = fileChannel;
+               this.headerAndBufferArray = 
BufferReaderWriterUtil.allocatedWriteBufferArray();
+               this.memoryMappedRegions = new ArrayList<>(4);
+               this.maxRegionSize = maxSizePerMappedRegion;
+               this.endOfCurrentRegion = maxSizePerMappedRegion;
+       }
+
+       @Override
+       public void writeBuffer(Buffer buffer) throws IOException {
+               if (tryWriteBuffer(buffer)) {
+                       return;
+               }
+
+               mapRegionAndStartNext();
+
+               if (!tryWriteBuffer(buffer)) {
+                       throwTooLargeBuffer(buffer);
+               }
+       }
+
+       private boolean tryWriteBuffer(Buffer buffer) throws IOException {
+               final long spaceLeft = endOfCurrentRegion - pos;
+               final long bytesWritten = 
BufferReaderWriterUtil.writeToByteChannelIfBelowSize(
+                               fileChannel, buffer, headerAndBufferArray, 
spaceLeft);
+
+               if (bytesWritten >= 0) {
+                       pos += bytesWritten;
+                       return true;
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public BoundedData.Reader createReader() {
+               checkState(!fileChannel.isOpen());
+
+               final List<ByteBuffer> buffers = memoryMappedRegions.stream()
+                               .map((bb) -> 
bb.duplicate().order(ByteOrder.nativeOrder()))
+                               .collect(Collectors.toList());
+
+               return new MemoryMappedBoundedData.BufferSlicer(buffers);
+       }
+
+       /**
+        * Finishes the current region and prevents further writes.
+        * After calling this method, further calls to {@link 
#writeBuffer(Buffer)} will fail.
+        */
+       @Override
+       public void finishWrite() throws IOException {
+               mapRegionAndStartNext();
+               fileChannel.close();
+       }
+
+       /**
+        * Closes the file and unmaps all memory mapped regions.
+        * After calling this method, access to any ByteBuffer obtained from 
this instance
+        * will cause a segmentation fault.
+        */
+       public void close() throws IOException {
+               IOUtils.closeQuietly(fileChannel);
+
+               for (ByteBuffer bb : memoryMappedRegions) {
+                       PlatformDependent.freeDirectBuffer(bb);
+               }
+               memoryMappedRegions.clear();
+
+               // To make this compatible with all versions of Windows, we 
must wait with
+               // deleting the file until it is unmapped.
+               // See also 
https://stackoverflow.com/questions/11099295/file-flag-delete-on-close-and-memory-mapped-files/51649618#51649618
+
+               Files.delete(filePath);
+       }
+
+       @Override
+       public long getSize() {
+               return pos;
+       }
+
+       private void mapRegionAndStartNext() throws IOException {
+               final ByteBuffer region = fileChannel.map(MapMode.READ_ONLY, 
startOfCurrentRegion, pos - startOfCurrentRegion);
+               region.order(ByteOrder.nativeOrder());
+               memoryMappedRegions.add(region);
+
+               startOfCurrentRegion = pos;
+               endOfCurrentRegion = startOfCurrentRegion + maxRegionSize;
+       }
+
+       private void throwTooLargeBuffer(Buffer buffer) throws IOException {
+               throw new IOException(String.format(
+                               "The buffer (%d bytes) is larger than the 
maximum size of a memory buffer (%d bytes)",
+                               buffer.getSize(), maxRegionSize));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Factories
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new FileChannelMemoryMappedBoundedData, creating a memory 
mapped file at the given path.
+        */
+       public static FileChannelMemoryMappedBoundedData create(Path 
memMappedFilePath) throws IOException {
+               return createWithRegionSize(memMappedFilePath, 
Integer.MAX_VALUE);
+       }
+
+       /**
+        * Creates new FileChannelMemoryMappedBoundedData, creating a memory 
mapped file at the given path.
+        * Each mapped region (= ByteBuffer) will be of the given size.
+        */
+       public static FileChannelMemoryMappedBoundedData 
createWithRegionSize(Path memMappedFilePath, int regionSize) throws IOException 
{
+               checkNotNull(memMappedFilePath, "memMappedFilePath");
+               checkArgument(regionSize > 0, "regions size most be > 0");
+
+               final FileChannel fileChannel = 
FileChannel.open(memMappedFilePath,
+                               StandardOpenOption.READ, 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
+
+               return new 
FileChannelMemoryMappedBoundedData(memMappedFilePath, fileChannel, regionSize);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
old mode 100644
new mode 100755
similarity index 73%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
index f2973b8..502c64c
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
@@ -19,16 +19,15 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
 import org.apache.flink.util.IOUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 import java.nio.file.Files;
@@ -42,24 +41,14 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * This class is largely a workaround for the fact that a memory mapped region 
in Java can cannot
+ * An implementation of {@link BoundedData} simply through ByteBuffers backed 
by memory, typically
+ * from a memory mapped file, so the data gets automatically evicted from 
memory if it grows large.
+ *
+ * <p>Most of the code in this class is a workaround for the fact that a 
memory mapped region in Java cannot
  * be larger than 2GB (== signed 32 bit int max value). The class takes {@link 
Buffer Buffers} and
- * writes them to several memory mapped region, using the {@link 
BufferToByteBuffer}
+ * writes them to several memory mapped region, using the {@link 
BufferReaderWriterUtil}
  * class.
  *
- * <h2>Usage</h2>
- *
- * <p>The class assumes in the first phase that data is written by repeatedly 
calling
- * {@link #writeBuffer(Buffer)}. That puts the data into the memory region of 
the memory
- * mapped file. After writing, one must call {@link #finishWrite()}.
- *
- * <p>After that, the class can produce multiple {@link BufferSlicer} 
instances to re-read
- * the data from the memory regions. Multiple slicers can read concurrently, 
but each slicer
- * should be read from by a single thread.
- *
- * <p>Eventually, the resources must be disposed via {@link #close()}. After 
that,
- * no reading can happen any more.
- *
  * <h2>Important!</h2>
  *
  * <p>This class performs absolutely no synchronization and relies on single 
threaded access
@@ -67,18 +56,18 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * segmentation faults!
  *
  * <p>This class does limited sanity checks and assumes correct use from 
{@link BoundedBlockingSubpartition}
- * and {@link BoundedBlockingSubpartitionReader}, such as writing first and 
rading after.
+ * and {@link BoundedBlockingSubpartitionReader}, such as writing first and 
reading after.
  * Not obeying these contracts throws NullPointerExceptions.
  */
-class MemoryMappedBuffers implements Closeable {
+final class MemoryMappedBoundedData implements BoundedData {
 
        /** Memory mappings should be at the granularity of page sizes, for 
efficiency. */
        private static final int PAGE_SIZE = 
PageSizeUtil.getSystemPageSizeOrConservativeMultiple();
 
-       /** The encoder to the current memory mapped region we are writing to.
+       /** The the current memory mapped region we are writing to.
         * This value is null once writing has finished or the buffers are 
disposed. */
        @Nullable
-       private BufferToByteBuffer.Writer currentBuffer;
+       private ByteBuffer currentBuffer;
 
        /** All memory mapped regions that are already full (completed). */
        private final ArrayList<ByteBuffer> fullBuffers;
@@ -95,7 +84,7 @@ class MemoryMappedBuffers implements Closeable {
        /** The size of each mapped region. */
        private final long mappingSize;
 
-       MemoryMappedBuffers(
+       MemoryMappedBoundedData(
                        Path filePath,
                        FileChannel fileChannel,
                        int maxSizePerByteBuffer) throws IOException {
@@ -108,25 +97,27 @@ class MemoryMappedBuffers implements Closeable {
                rollOverToNextBuffer();
        }
 
-       void writeBuffer(Buffer buffer) throws IOException {
+       @Override
+       public void writeBuffer(Buffer buffer) throws IOException {
                assert currentBuffer != null;
 
-               if (currentBuffer.writeBuffer(buffer)) {
+               if (BufferReaderWriterUtil.writeBuffer(buffer, currentBuffer)) {
                        return;
                }
 
                rollOverToNextBuffer();
 
-               if (!currentBuffer.writeBuffer(buffer)) {
+               if (!BufferReaderWriterUtil.writeBuffer(buffer, currentBuffer)) 
{
                        throwTooLargeBuffer(buffer);
                }
        }
 
-       BufferSlicer getFullBuffers() {
+       @Override
+       public BufferSlicer createReader() {
                assert currentBuffer == null;
 
                final List<ByteBuffer> buffers = fullBuffers.stream()
-                               .map(ByteBuffer::slice)
+                               .map((bb) -> 
bb.slice().order(ByteOrder.nativeOrder()))
                                .collect(Collectors.toList());
 
                return new BufferSlicer(buffers);
@@ -136,10 +127,12 @@ class MemoryMappedBuffers implements Closeable {
         * Finishes the current region and prevents further writes.
         * After calling this method, further calls to {@link 
#writeBuffer(Buffer)} will fail.
         */
-       void finishWrite() throws IOException {
+       @Override
+       public void finishWrite() throws IOException {
                assert currentBuffer != null;
 
-               fullBuffers.add(currentBuffer.complete());
+               currentBuffer.flip();
+               fullBuffers.add(currentBuffer);
                currentBuffer = null; // fail further writes fast
                file.close(); // won't map further regions from now on
        }
@@ -158,7 +151,7 @@ class MemoryMappedBuffers implements Closeable {
                fullBuffers.clear();
 
                if (currentBuffer != null) {
-                       
PlatformDependent.freeDirectBuffer(currentBuffer.complete());
+                       PlatformDependent.freeDirectBuffer(currentBuffer);
                        currentBuffer = null;
                }
 
@@ -172,13 +165,14 @@ class MemoryMappedBuffers implements Closeable {
        /**
         * Gets the number of bytes of all written data (including the metadata 
in the buffer headers).
         */
-       long getSize() {
+       @Override
+       public long getSize() {
                long size = 0L;
                for (ByteBuffer bb : fullBuffers) {
                        size += bb.remaining();
                }
                if (currentBuffer != null) {
-                       size += currentBuffer.getNumBytes();
+                       size += currentBuffer.position();
                }
                return size;
        }
@@ -187,11 +181,12 @@ class MemoryMappedBuffers implements Closeable {
                if (currentBuffer != null) {
                        // we need to remember the original buffers, not any 
slices.
                        // slices have no cleaner, which we need to trigger 
explicit unmapping
-                       fullBuffers.add(currentBuffer.complete());
+                       currentBuffer.flip();
+                       fullBuffers.add(currentBuffer);
                }
 
-               final ByteBuffer mapped = file.map(MapMode.READ_WRITE, 
nextMappingOffset, mappingSize);
-               currentBuffer = new Writer(mapped);
+               currentBuffer = file.map(MapMode.READ_WRITE, nextMappingOffset, 
mappingSize);
+               currentBuffer.order(ByteOrder.nativeOrder());
                nextMappingOffset += mappingSize;
        }
 
@@ -220,11 +215,11 @@ class MemoryMappedBuffers implements Closeable {
         * The "reader" for the memory region. It slices a sequence of buffers 
from the
         * sequence of mapped ByteBuffers.
         */
-       static final class BufferSlicer {
+       static final class BufferSlicer implements BoundedData.Reader {
 
-               /** The reader/decoder to the memory mapped region with the 
data we currently read from.
+               /** The memory mapped region we currently read from.
                 * Max 2GB large. Further regions may be in the {@link 
#furtherData} field. */
-               private BufferToByteBuffer.Reader data;
+               private ByteBuffer currentData;
 
                /** Further byte buffers, to handle cases where there is more 
data than fits into
                 * one mapped byte buffer (2GB = Integer.MAX_VALUE). */
@@ -232,16 +227,17 @@ class MemoryMappedBuffers implements Closeable {
 
                BufferSlicer(Iterable<ByteBuffer> data) {
                        this.furtherData = data.iterator();
-                       this.data = new 
BufferToByteBuffer.Reader(furtherData.next());
+                       this.currentData = furtherData.next();
                }
 
+               @Override
                @Nullable
-               public Buffer sliceNextBuffer() {
+               public Buffer nextBuffer() {
                        // should only be null once empty or disposed, in which 
case this method
                        // should not be called any more
-                       assert data != null;
+                       assert currentData != null;
 
-                       final Buffer next = data.sliceNextBuffer();
+                       final Buffer next = 
BufferReaderWriterUtil.sliceNextBuffer(currentData);
                        if (next != null) {
                                return next;
                        }
@@ -250,8 +246,14 @@ class MemoryMappedBuffers implements Closeable {
                                return null;
                        }
 
-                       data = new 
BufferToByteBuffer.Reader(furtherData.next());
-                       return sliceNextBuffer();
+                       currentData = furtherData.next();
+                       return nextBuffer();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       // nothing to do, this class holds no actual resources 
of its own,
+                       // only references to the mapped byte buffers
                }
        }
 
@@ -260,20 +262,20 @@ class MemoryMappedBuffers implements Closeable {
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates new MemoryMappedBuffers, creating a memory mapped file at 
the given path.
+        * Creates new MemoryMappedBoundedData, creating a memory mapped file 
at the given path.
         */
-       public static MemoryMappedBuffers create(Path memMappedFilePath) throws 
IOException {
+       public static MemoryMappedBoundedData create(Path memMappedFilePath) 
throws IOException {
                return createWithRegionSize(memMappedFilePath, 
Integer.MAX_VALUE);
        }
 
        /**
-        * Creates new MemoryMappedBuffers, creating a memory mapped file at 
the given path.
+        * Creates new MemoryMappedBoundedData, creating a memory mapped file 
at the given path.
         * Each mapped region (= ByteBuffer) will be of the given size.
         */
-       public static MemoryMappedBuffers createWithRegionSize(Path 
memMappedFilePath, int regionSize) throws IOException {
+       public static MemoryMappedBoundedData createWithRegionSize(Path 
memMappedFilePath, int regionSize) throws IOException {
                final FileChannel fileChannel = 
FileChannel.open(memMappedFilePath,
                                StandardOpenOption.READ, 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
 
-               return new MemoryMappedBuffers(memMappedFilePath, fileChannel, 
regionSize);
+               return new MemoryMappedBoundedData(memMappedFilePath, 
fileChannel, regionSize);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
old mode 100644
new mode 100755
index 6fc5cfc..137ea5f
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MemoryArchitecture;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
 
@@ -41,8 +43,11 @@ import java.util.Optional;
  * Factory for {@link ResultPartition} to use in {@link 
NettyShuffleEnvironment}.
  */
 public class ResultPartitionFactory {
+
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartitionFactory.class);
 
+       private static final BoundedBlockingSubpartitionType 
BOUNDED_BLOCKING_TYPE = getBoundedBlockingType();
+
        @Nonnull
        private final ResultPartitionManager partitionManager;
 
@@ -114,7 +119,7 @@ public class ResultPartitionFactory {
                                partitionManager,
                                bufferPoolFactory);
 
-               createSubpartitions(partition, type, subpartitions);
+               createSubpartitions(partition, type, subpartitions, 
this.bufferPoolFactory.getBufferSize());
 
                LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
 
@@ -122,12 +127,15 @@ public class ResultPartitionFactory {
        }
 
        private void createSubpartitions(
-               ResultPartition partition, ResultPartitionType type, 
ResultSubpartition[] subpartitions) {
+                       ResultPartition partition,
+                       ResultPartitionType type,
+                       ResultSubpartition[] subpartitions,
+                       int networkBufferSize) {
 
                // Create the subpartitions.
                switch (type) {
                        case BLOCKING:
-                               
initializeBoundedBlockingPartitions(subpartitions, partition, ioManager);
+                               
initializeBoundedBlockingPartitions(subpartitions, partition, ioManager, 
networkBufferSize);
                                break;
 
                        case PIPELINED:
@@ -146,13 +154,14 @@ public class ResultPartitionFactory {
        private static void initializeBoundedBlockingPartitions(
                ResultSubpartition[] subpartitions,
                ResultPartition parent,
-               IOManager ioManager) {
+               IOManager ioManager,
+               int networkBufferSize) {
 
                int i = 0;
                try {
                        for (; i < subpartitions.length; i++) {
-                               subpartitions[i] = new 
BoundedBlockingSubpartition(
-                                       i, parent, 
ioManager.createChannel().getPathFile().toPath());
+                               final File spillFile = 
ioManager.createChannel().getPathFile();
+                               subpartitions[i] = 
BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize);
                        }
                }
                catch (IOException e) {
@@ -188,4 +197,16 @@ public class ResultPartitionFactory {
                                type.hasBackPressure() ? Optional.empty() : 
Optional.of(p));
                };
        }
+
+       private static BoundedBlockingSubpartitionType getBoundedBlockingType() 
{
+               switch (MemoryArchitecture.get()) {
+                       case _64_BIT:
+                               return 
BoundedBlockingSubpartitionType.FILE_MMAP;
+                       case _32_BIT:
+                               return BoundedBlockingSubpartitionType.FILE;
+                       default:
+                               LOG.warn("Cannot determine memory architecture. 
Using pure file-based shuffle.");
+                               return BoundedBlockingSubpartitionType.FILE;
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index 5051b90..e76cd1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -24,12 +24,12 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
 
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -59,12 +59,24 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                partition.release();
        }
 
+       @Test
+       public void testClosingClosesBoundedData() throws Exception {
+               final TestingBoundedDataReader reader = new 
TestingBoundedDataReader();
+               final BoundedBlockingSubpartitionReader bbspr = new 
BoundedBlockingSubpartitionReader(
+                               (BoundedBlockingSubpartition) 
createSubpartition(), reader, 10);
+
+               bbspr.releaseAllResources();
+
+               assertTrue(reader.closed);
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override
        ResultSubpartition createSubpartition() throws Exception {
                final ResultPartition resultPartition = 
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING);
-               return new BoundedBlockingSubpartition(0, resultPartition, 
tmpPath());
+               return BoundedBlockingSubpartition.createWithMemoryMappedFile(
+                               0, resultPartition, new 
File(TMP_DIR.newFolder(), "subpartition"));
        }
 
        @Override
@@ -74,32 +86,50 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                return new BoundedBlockingSubpartition(
                                0,
                                resultPartition,
-                               FailingMemory.create());
+                               new FailingBoundedData());
        }
 
        // 
------------------------------------------------------------------------
 
-       static Path tmpPath() throws IOException {
-               return new File(TMP_DIR.newFolder(), "subpartition").toPath();
-       }
+       private static class FailingBoundedData implements BoundedData {
 
-       // 
------------------------------------------------------------------------
+               @Override
+               public void writeBuffer(Buffer buffer) throws IOException {
+                       throw new IOException("test");
+               }
+
+               @Override
+               public void finishWrite() throws IOException {
+                       throw new UnsupportedOperationException();
+               }
 
-       private static class FailingMemory extends MemoryMappedBuffers {
+               @Override
+               public Reader createReader() throws IOException {
+                       throw new UnsupportedOperationException();
+               }
 
-               FailingMemory(Path path, FileChannel fc) throws IOException {
-                       super(path, fc, Integer.MAX_VALUE);
+               @Override
+               public long getSize() {
+                       throw new UnsupportedOperationException();
                }
 
                @Override
-               void writeBuffer(Buffer buffer) throws IOException {
-                       throw new IOException("test");
+               public void close() {}
+       }
+
+       private static class TestingBoundedDataReader implements 
BoundedData.Reader {
+
+               boolean closed;
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       return null;
                }
 
-               static FailingMemory create() throws IOException {
-                       Path p = tmpPath();
-                       FileChannel fc = FileChannel.open(p, 
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
-                       return new FailingMemory(p, fc);
+               @Override
+               public void close() throws IOException {
+                       closed = true;
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 46dbb05..359ad8d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -27,10 +27,17 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAn
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -38,11 +45,32 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
  */
+@RunWith(Parameterized.class)
 public class BoundedBlockingSubpartitionWriteReadTest {
 
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
+       private static final int BUFFER_SIZE = 1024 * 1024;
+
+       // 
------------------------------------------------------------------------
+       //  parameters
+       // 
------------------------------------------------------------------------
+
+       @Parameters(name = "type = {0}")
+       public static Collection<Object[]> modes() {
+               return Arrays.stream(BoundedBlockingSubpartitionType.values())
+                               .map((type) -> new Object[] { type })
+                               .collect(Collectors.toList());
+       }
+
+       @Parameter
+       public BoundedBlockingSubpartitionType type;
+
+       // 
------------------------------------------------------------------------
+       //  tests
+       // 
------------------------------------------------------------------------
+
        @Test
        public void testWriteAndReadData() throws Exception {
                final int numLongs = 15_000_000; // roughly 115 MiBytes
@@ -118,6 +146,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
                                assertEquals(expectedNextLong++, 
buffer.getLong());
                        }
 
+                       next.buffer().recycleBuffer();
                        nextExpectedBacklog--;
                }
 
@@ -130,7 +159,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
        // 
------------------------------------------------------------------------
 
        private static void writeLongs(BoundedBlockingSubpartition partition, 
long nums) throws IOException {
-               final MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(1024 * 1024);
+               final MemorySegment memory = 
MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
 
                long l = 0;
                while (nums > 0) {
@@ -149,18 +178,19 @@ public class BoundedBlockingSubpartitionWriteReadTest {
                }
        }
 
-       private static BoundedBlockingSubpartition createAndFillPartition(long 
numLongs) throws IOException {
+       private BoundedBlockingSubpartition createAndFillPartition(long 
numLongs) throws IOException {
                BoundedBlockingSubpartition subpartition = createSubpartition();
                writeLongs(subpartition, numLongs);
                subpartition.finish();
                return subpartition;
        }
 
-       private static BoundedBlockingSubpartition createSubpartition() throws 
IOException {
-               return new BoundedBlockingSubpartition(
+       private BoundedBlockingSubpartition createSubpartition() throws 
IOException {
+               return type.create(
                                0,
                                
PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING),
-                               new File(TMP_FOLDER.newFolder(), 
"partitiondata").toPath());
+                               new File(TMP_FOLDER.newFolder(), 
"partitiondata"),
+                               BUFFER_SIZE);
        }
 
        private static LongReader[] createSubpartitionLongReaders(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
similarity index 53%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
index eec7dba..bd7421c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import 
org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer;
 
 import org.hamcrest.Matchers;
 import org.junit.ClassRule;
@@ -34,7 +33,6 @@ import java.nio.file.Path;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -42,114 +40,141 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
  */
-public class MemoryMappedBuffersTest {
+public abstract class BoundedDataTestBase {
 
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
+       /** Max buffer sized used by the tests that write data. For 
implementations that need
+        * to instantiate buffers in the read side. */
+       protected static final int BUFFER_SIZE = 1024 * 1024; // 1 MiByte
+
+       // 
------------------------------------------------------------------------
+       //  BoundedData Instantiation
+       // 
------------------------------------------------------------------------
+
+       protected abstract BoundedData createBoundedData(Path tempFilePath) 
throws IOException;
+
+       protected abstract BoundedData createBoundedDataWithRegion(Path 
tempFilePath, int regionSize) throws IOException;
+
+       private BoundedData createBoundedData() throws IOException {
+               return createBoundedData(createTempPath());
+       }
+
+       private BoundedData createBoundedDataWithRegion(int regionSize) throws 
IOException {
+               return createBoundedDataWithRegion(createTempPath(), 
regionSize);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
        @Test
        public void testWriteAndReadData() throws Exception {
-               testWriteAndReadData(10_000_000, Integer.MAX_VALUE);
+               try (BoundedData bd = createBoundedData()) {
+                       testWriteAndReadData(bd);
+               }
        }
 
        @Test
        public void testWriteAndReadDataAcrossRegions() throws Exception {
-               testWriteAndReadData(10_000_000, 1_276_347);
+               try (BoundedData bd = createBoundedDataWithRegion(1_276_347)) {
+                       testWriteAndReadData(bd);
+               }
        }
 
-       private static void testWriteAndReadData(int numInts, int regionSize) 
throws Exception {
-               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
-                       final int numBuffers = writeInts(memory, numInts);
-                       memory.finishWrite();
+       private void testWriteAndReadData(BoundedData bd) throws Exception {
+               final int numInts = 10_000_000;
+               final int numBuffers = writeInts(bd, numInts);
+               bd.finishWrite();
 
-                       readInts(memory.getFullBuffers(), numBuffers, numInts);
-               }
+               readInts(bd.createReader(), numBuffers, numInts);
        }
 
        @Test
        public void returnNullAfterEmpty() throws Exception {
-               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.create(createTempPath())) {
-                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer());
-                       memory.finishWrite();
+               try (BoundedData bd = createBoundedData()) {
+                       bd.finishWrite();
 
-                       final BufferSlicer reader = memory.getFullBuffers();
-                       assertNotNull(reader.sliceNextBuffer());
+                       final BoundedData.Reader reader = bd.createReader();
 
                        // check that multiple calls now return empty buffers
-                       assertNull(reader.sliceNextBuffer());
-                       assertNull(reader.sliceNextBuffer());
-                       assertNull(reader.sliceNextBuffer());
+                       assertNull(reader.nextBuffer());
+                       assertNull(reader.nextBuffer());
+                       assertNull(reader.nextBuffer());
                }
        }
 
        @Test
        public void testDeleteFileOnClose() throws Exception {
                final Path path = createTempPath();
-               final MemoryMappedBuffers mmb = 
MemoryMappedBuffers.create(path);
+               final BoundedData bd = createBoundedData(path);
                assertTrue(Files.exists(path));
 
-               mmb.close();
+               bd.close();
 
                assertFalse(Files.exists(path));
        }
 
        @Test
        public void testGetSizeSingleRegion() throws Exception {
-               testGetSize(Integer.MAX_VALUE);
+               try (BoundedData bd = createBoundedData()) {
+                       testGetSize(bd);
+               }
        }
 
        @Test
        public void testGetSizeMultipleRegions() throws Exception {
-               testGetSize(100_000);
+               try (BoundedData bd = createBoundedDataWithRegion(100_000)) {
+                       testGetSize(bd);
+               }
        }
 
-       private static void testGetSize(int regionSize) throws Exception {
+       private static void testGetSize(BoundedData bd) throws Exception {
                final int bufferSize1 = 60_787;
                final int bufferSize2 = 76_687;
-               final int expectedSize1 = bufferSize1 + 
BufferToByteBuffer.HEADER_LENGTH;
-               final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * 
BufferToByteBuffer.HEADER_LENGTH;
-
-               try (MemoryMappedBuffers memory = 
MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) {
+               final int expectedSize1 = bufferSize1 + 
BufferReaderWriterUtil.HEADER_LENGTH;
+               final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * 
BufferReaderWriterUtil.HEADER_LENGTH;
 
-                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1));
-                       assertEquals(expectedSize1, memory.getSize());
+               
bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1));
+               assertEquals(expectedSize1, bd.getSize());
 
-                       
memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2));
-                       assertEquals(expectedSizeFinal, memory.getSize());
+               
bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2));
+               assertEquals(expectedSizeFinal, bd.getSize());
 
-                       memory.finishWrite();
-                       assertEquals(expectedSizeFinal, memory.getSize());
-               }
+               bd.finishWrite();
+               assertEquals(expectedSizeFinal, bd.getSize());
        }
 
        // 
------------------------------------------------------------------------
        //  utils
        // 
------------------------------------------------------------------------
 
-       private static int writeInts(MemoryMappedBuffers memory, int numInts) 
throws IOException {
-               final int bufferSize = 1024 * 1024; // 1 MiByte
-               final int numIntsInBuffer = bufferSize / 4;
+       private static int writeInts(BoundedData bd, int numInts) throws 
IOException {
+               final int numIntsInBuffer = BUFFER_SIZE / 4;
                int numBuffers = 0;
 
                for (int nextValue = 0; nextValue < numInts; nextValue += 
numIntsInBuffer) {
-                       Buffer buffer = 
BufferBuilderTestUtils.buildBufferWithAscendingInts(bufferSize, 
numIntsInBuffer, nextValue);
-                       memory.writeBuffer(buffer);
+                       Buffer buffer = 
BufferBuilderTestUtils.buildBufferWithAscendingInts(BUFFER_SIZE, 
numIntsInBuffer, nextValue);
+                       bd.writeBuffer(buffer);
                        numBuffers++;
                }
 
                return numBuffers;
        }
 
-       private static void readInts(MemoryMappedBuffers.BufferSlicer memory, 
int numBuffersExpected, int numInts) throws IOException {
+       private static void readInts(BoundedData.Reader reader, int 
numBuffersExpected, int numInts) throws IOException {
                Buffer b;
                int nextValue = 0;
                int numBuffers = 0;
 
-               while ((b = memory.sliceNextBuffer()) != null) {
+               while ((b = reader.nextBuffer()) != null) {
                        final int numIntsInBuffer = b.getSize() / 4;
                        
BufferBuilderTestUtils.validateBufferWithAscendingInts(b, numIntsInBuffer, 
nextValue);
                        nextValue += numIntsInBuffer;
                        numBuffers++;
+
+                       b.recycleBuffer();
                }
 
                assertEquals(numBuffersExpected, numBuffers);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
new file mode 100644
index 0000000..606dedf
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.StandardOpenOption;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link BufferReaderWriterUtil}.
+ */
+public class BufferReaderWriterUtilTest {
+
+       @ClassRule
+       public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+       // 
------------------------------------------------------------------------
+       // Byte Buffer
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void writeReadByteBuffer() {
+               final ByteBuffer memory = ByteBuffer.allocateDirect(1200);
+               final Buffer buffer = createTestBuffer();
+
+               BufferReaderWriterUtil.writeBuffer(buffer, memory);
+               final int pos = memory.position();
+               memory.flip();
+               Buffer result = BufferReaderWriterUtil.sliceNextBuffer(memory);
+
+               assertEquals(pos, memory.position());
+               validateTestBuffer(result);
+       }
+
+       @Test
+       public void writeByteBufferNotEnoughSpace() {
+               final ByteBuffer memory = ByteBuffer.allocateDirect(10);
+               final Buffer buffer = createTestBuffer();
+
+               final boolean written = 
BufferReaderWriterUtil.writeBuffer(buffer, memory);
+
+               assertFalse(written);
+               assertEquals(0, memory.position());
+               assertEquals(memory.capacity(), memory.limit());
+       }
+
+       @Test
+       public void readFromEmptyByteBuffer() {
+               final ByteBuffer memory = ByteBuffer.allocateDirect(100);
+               memory.position(memory.limit());
+
+               final Buffer result = 
BufferReaderWriterUtil.sliceNextBuffer(memory);
+
+               assertNull(result);
+       }
+
+       @Test
+       public void testReadFromByteBufferNotEnoughData() {
+               final ByteBuffer memory = ByteBuffer.allocateDirect(1200);
+               final Buffer buffer = createTestBuffer();
+               BufferReaderWriterUtil.writeBuffer(buffer, memory);
+
+               memory.flip().limit(memory.limit() - 1);
+               ByteBuffer tooSmall = memory.slice();
+
+               try {
+                       BufferReaderWriterUtil.sliceNextBuffer(tooSmall);
+                       fail();
+               }
+               catch (Exception e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  File Channel
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void writeReadFileChannel() throws Exception {
+               final FileChannel fc = tmpFileChannel();
+               final Buffer buffer = createTestBuffer();
+               final MemorySegment readBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+
+               BufferReaderWriterUtil.writeToByteChannel(fc, buffer, 
BufferReaderWriterUtil.allocatedWriteBufferArray());
+               fc.position(0);
+
+               Buffer result = BufferReaderWriterUtil.readFromByteChannel(
+                               fc, 
BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, 
FreeingBufferRecycler.INSTANCE);
+
+               validateTestBuffer(result);
+       }
+
+       @Test
+       public void readPrematureEndOfFile1() throws Exception {
+               final FileChannel fc = tmpFileChannel();
+               final Buffer buffer = createTestBuffer();
+               final MemorySegment readBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+
+               BufferReaderWriterUtil.writeToByteChannel(fc, buffer, 
BufferReaderWriterUtil.allocatedWriteBufferArray());
+               fc.truncate(fc.position() - 1);
+               fc.position(0);
+
+               try {
+                       BufferReaderWriterUtil.readFromByteChannel(
+                                       fc, 
BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, 
FreeingBufferRecycler.INSTANCE);
+                       fail();
+               }
+               catch (IOException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void readPrematureEndOfFile2() throws Exception {
+               final FileChannel fc = tmpFileChannel();
+               final Buffer buffer = createTestBuffer();
+               final MemorySegment readBuffer = 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+
+               BufferReaderWriterUtil.writeToByteChannel(fc, buffer, 
BufferReaderWriterUtil.allocatedWriteBufferArray());
+               fc.truncate(2); // less than a header size
+               fc.position(0);
+
+               try {
+                       BufferReaderWriterUtil.readFromByteChannel(
+                                       fc, 
BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, 
FreeingBufferRecycler.INSTANCE);
+                       fail();
+               }
+               catch (IOException e) {
+                       // expected
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Mixed
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void writeFileReadMemoryBuffer() throws Exception {
+               final FileChannel fc = tmpFileChannel();
+               final Buffer buffer = createTestBuffer();
+               BufferReaderWriterUtil.writeToByteChannel(fc, buffer, 
BufferReaderWriterUtil.allocatedWriteBufferArray());
+
+               final ByteBuffer bb = fc.map(MapMode.READ_ONLY, 0, 
fc.position()).order(ByteOrder.nativeOrder());
+               BufferReaderWriterUtil.configureByteBuffer(bb);
+               fc.close();
+
+               Buffer result = BufferReaderWriterUtil.sliceNextBuffer(bb);
+
+               validateTestBuffer(result);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Util
+       // 
------------------------------------------------------------------------
+
+       private static FileChannel tmpFileChannel() throws IOException {
+               return FileChannel.open(TMP_FOLDER.newFile().toPath(),
+                               StandardOpenOption.CREATE, 
StandardOpenOption.READ, StandardOpenOption.WRITE);
+       }
+
+       private static Buffer createTestBuffer() {
+               return 
BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, 200, 0);
+       }
+
+       private static void validateTestBuffer(Buffer buffer) {
+               BufferBuilderTestUtils.validateBufferWithAscendingInts(buffer, 
200, 0);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
deleted file mode 100644
index 55fa496..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Reader;
-import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-/**
- * Tests for the {@link BufferToByteBuffer}.
- */
-public class BufferToByteBufferTest {
-
-       @Test
-       public void testCompleteIsSameBufferAsOriginal() {
-               final ByteBuffer bb = ByteBuffer.allocateDirect(128);
-               final BufferToByteBuffer.Writer writer = new 
BufferToByteBuffer.Writer(bb);
-
-               final ByteBuffer result = writer.complete();
-
-               assertSame(bb, result);
-       }
-
-       @Test
-       public void testWriteReadMatchesCapacity() {
-               final ByteBuffer bb = ByteBuffer.allocateDirect(1200);
-               testWriteAndReadMultipleBuffers(bb, 100);
-       }
-
-       @Test
-       public void testWriteReadWithLeftoverCapacity() {
-               final ByteBuffer bb = ByteBuffer.allocateDirect(1177);
-               testWriteAndReadMultipleBuffers(bb, 100);
-       }
-
-       private void testWriteAndReadMultipleBuffers(ByteBuffer buffer, int 
numIntsPerBuffer) {
-               final Writer writer = new Writer(buffer);
-
-               int numBuffers = 0;
-               while 
(writer.writeBuffer(BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, 
numIntsPerBuffer, 0))) {
-                       numBuffers++;
-               }
-
-               final ByteBuffer bb = writer.complete().slice();
-
-               final Reader reader = new Reader(bb);
-               Buffer buf;
-               while ((buf = reader.sliceNextBuffer()) != null) {
-                       
BufferBuilderTestUtils.validateBufferWithAscendingInts(buf, numIntsPerBuffer, 
0);
-                       numBuffers--;
-               }
-
-               assertEquals(0, numBuffers);
-       }
-
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
new file mode 100644
index 0000000..dd1c22b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.junit.AssumptionViolatedException;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ */
+public class FileChannelBoundedDataTest extends BoundedDataTestBase {
+
+       @Override
+       protected BoundedData createBoundedData(Path tempFilePath) throws 
IOException {
+               return FileChannelBoundedData.create(tempFilePath, BUFFER_SIZE);
+       }
+
+       @Override
+       protected BoundedData createBoundedDataWithRegion(Path tempFilePath, 
int regionSize) throws IOException {
+               throw new AssumptionViolatedException("FileChannelBoundedData 
is not region based");
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java
new file mode 100644
index 0000000..0fe1f42
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ */
+public class FileChannelMemoryMappedBoundedDataTest extends 
BoundedDataTestBase {
+
+       @Override
+       protected BoundedData createBoundedData(Path tempFilePath) throws 
IOException {
+               return FileChannelMemoryMappedBoundedData.create(tempFilePath);
+       }
+
+       @Override
+       protected BoundedData createBoundedDataWithRegion(Path tempFilePath, 
int regionSize) throws IOException {
+               return 
FileChannelMemoryMappedBoundedData.createWithRegionSize(tempFilePath, 
regionSize);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java
new file mode 100644
index 0000000..18c5823
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+/**
+ * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ */
+public class MemoryMappedBoundedDataTest extends BoundedDataTestBase {
+
+       @Override
+       protected BoundedData createBoundedData(Path tempFilePath) throws 
IOException {
+               return MemoryMappedBoundedData.create(tempFilePath);
+       }
+
+       @Override
+       protected BoundedData createBoundedDataWithRegion(Path tempFilePath, 
int regionSize) throws IOException {
+               return 
MemoryMappedBoundedData.createWithRegionSize(tempFilePath, regionSize);
+       }
+}

Reply via email to