This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d88882cc6db523a4fe2d70adad99bb66dc5d48c1 Author: Stephan Ewen <[email protected]> AuthorDate: Sat Jun 8 16:05:24 2019 +0200 [FLINK-12986] [network] Add different implementations to store/serve the data from the BoundedBlockingSubpartition This PR encapsulates the following steps: - introduce the BoundedData interface to abstract the way that buffers are stored / read as part of a bounded stream subpartition - Change BufferToByteBuffer from reader/writer to encoder util that works across byte buffers and byte channels so we can use it for both file-based implementations and memory-buffer based implementations - Add FileChannelBoundedData which writes the buffers to a file and reads them back from the file - Add FileChannelMemoryMappedBoundedData which writes the buffer to a file and maps that file into memory for reading - Add common tests for all implementations of BoundedData - Rename MemoryMappedBuffers to MemoryMappedBoundedData to be consistent with other names. - Instantiate new bounded data store implementations from ResultPartition - Use FILE_MMAP implementation on 64bit systems and FILE implementation on other systems --- .../org/apache/flink/util/MemoryArchitecture.java | 77 +++++++ .../apache/flink/util/MemoryArchitectureTest.java | 36 +++ .../io/network/buffer/BufferPoolFactory.java | 5 + .../io/network/buffer/NetworkBufferPool.java | 5 + .../partition/BoundedBlockingSubpartition.java | 73 +++--- .../BoundedBlockingSubpartitionReader.java | 22 +- .../partition/BoundedBlockingSubpartitionType.java | 75 +++++++ .../runtime/io/network/partition/BoundedData.java | 80 +++++++ .../network/partition/BufferReaderWriterUtil.java | 250 +++++++++++++++++++++ .../io/network/partition/BufferToByteBuffer.java | 130 ----------- .../network/partition/FileChannelBoundedData.java | 160 +++++++++++++ .../FileChannelMemoryMappedBoundedData.java | 210 +++++++++++++++++ ...edBuffers.java => MemoryMappedBoundedData.java} | 100 +++++---- .../network/partition/ResultPartitionFactory.java | 33 ++- .../partition/BoundedBlockingSubpartitionTest.java | 66 ++++-- .../BoundedBlockingSubpartitionWriteReadTest.java | 40 +++- ...edBuffersTest.java => BoundedDataTestBase.java} | 109 +++++---- .../partition/BufferReaderWriterUtilTest.java | 203 +++++++++++++++++ .../network/partition/BufferToByteBufferTest.java | 80 ------- .../partition/FileChannelBoundedDataTest.java | 40 ++++ .../FileChannelMemoryMappedBoundedDataTest.java | 38 ++++ .../partition/MemoryMappedBoundedDataTest.java | 38 ++++ 22 files changed, 1505 insertions(+), 365 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java new file mode 100755 index 0000000..3672921 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/MemoryArchitecture.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.util.Arrays; +import java.util.List; + +/** + * The memory architecture (32 bit / 64 bit) of the current process. + * Note that this might be different than the actual operating system's architecture, for example + * when installing a 32 bit JRE in a 64 bit OS. + */ +public enum MemoryArchitecture { + + // constants here start with an underscore because Java identifier cannot start with a + // numeric character and alternatives like 'BIT_64' are not as readable + + /** + * 32 bit memory address size. + */ + _32_BIT, + + /** + * 64 bit memory address size. + */ + _64_BIT, + + /** + * Unknown architecture, could not be determined. + */ + UNKNOWN; + + // ------------------------------------------------------------------------ + + private static final MemoryArchitecture current = getInternal(); + + /** + * Gets the processor architecture of this process. + */ + public static MemoryArchitecture get() { + return current; + } + + private static MemoryArchitecture getInternal() { + // putting these into the method to avoid having objects on the heap that are not needed + // any more after initialization + final List<String> names64bit = Arrays.asList("amd64", "x86_64"); + final List<String> names32bit = Arrays.asList("x86", "i386", "i486", "i586", "i686"); + final String arch = System.getProperty("os.arch"); + + if (names64bit.contains(arch)) { + return _64_BIT; + } + else if (names32bit.contains(arch)) { + return _32_BIT; + } + else { + return UNKNOWN; + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java b/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java new file mode 100755 index 0000000..92c829f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/MemoryArchitectureTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import static org.junit.Assert.assertNotEquals; + +/** + * Tests for the {@link MemoryArchitecture}. + */ +public class MemoryArchitectureTest { + + @Test + public void testArchitectureNotUnknown() { + final MemoryArchitecture arch = MemoryArchitecture.get(); + + assertNotEquals(MemoryArchitecture.UNKNOWN, arch); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java old mode 100644 new mode 100755 index de4c8e09..2b9ce97 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java @@ -59,4 +59,9 @@ public interface BufferPoolFactory { */ void destroyBufferPool(BufferPool bufferPool) throws IOException; + /** + * Gets the size of the buffers in the buffer pools produced by this factory. + */ + int getBufferSize(); + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java old mode 100644 new mode 100755 index 51c6aa1..0247ab7 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -121,6 +121,11 @@ public class NetworkBufferPool implements BufferPoolFactory, MemorySegmentProvid allocatedMb, availableMemorySegments.size(), segmentSize); } + @Override + public int getBufferSize() { + return memorySegmentSize; + } + @Nullable public MemorySegment requestMemorySegment() { return availableMemorySegments.poll(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java old mode 100644 new mode 100755 index 756b0af..7a74872 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -28,8 +27,8 @@ import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.HashSet; import java.util.Set; @@ -41,9 +40,8 @@ import static org.apache.flink.util.Preconditions.checkState; * in a blocking manner: The result is first produced, then consumed. * The result can be consumed possibly multiple times. * - * <p>The implementation creates a temporary memory mapped file and writes all buffers to that - * memory and serves the result from that memory. The kernel backs the mapped memory region - * with physical memory and file space incrementally as new pages are filled. + * <p>Depending on the supplied implementation of {@link BoundedData}, the actual data is stored + * for example in a file, or in a temporary memory mapped file. * * <h2>Important Notes on Thread Safety</h2> * @@ -57,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkState; * <p>The implementation supports multiple concurrent readers, but assumes a single * thread per reader. That same thread must also release the reader. In particular, after the reader * was released, no buffers obtained from this reader may be accessed any more, or segmentation - * faults might occur. + * faults might occur in some implementations. * * <p>The method calls to create readers, dispose readers, and dispose the partition are * thread-safe vis-a-vis each other. @@ -71,8 +69,8 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { @Nullable private BufferConsumer currentBuffer; - /** The memory that we store the data in, via a memory mapped file. */ - private final MemoryMappedBuffers data; + /** The bounded data store that we store the data in. */ + private final BoundedData data; /** All created and not yet released readers. */ @GuardedBy("lock") @@ -90,25 +88,10 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { /** Flag indicating whether the subpartition has been released. */ private boolean isReleased; - /** - * Common constructor. - */ public BoundedBlockingSubpartition( int index, ResultPartition parent, - Path filePath) throws IOException { - - this(index, parent, MemoryMappedBuffers.create(filePath)); - } - - /** - * Constructor for testing, to pass in custom MemoryMappedBuffers. - */ - @VisibleForTesting - BoundedBlockingSubpartition( - int index, - ResultPartition parent, - MemoryMappedBuffers data) throws IOException { + BoundedData data) { super(index, parent); @@ -215,7 +198,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { availability.notifyDataAvailable(); - final MemoryMappedBuffers.BufferSlicer dataReader = data.getFullBuffers(); + final BoundedData.Reader dataReader = data.createReader(); final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader( this, dataReader, numDataBuffersWritten); readers.add(reader); @@ -271,4 +254,44 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { int getBuffersInBacklog() { return numDataBuffersWritten; } + + // ---------------------------- factories -------------------------------- + + /** + * Creates a BoundedBlockingSubpartition that simply stores the partition data in a file. + * Data is eagerly spilled (written to disk) and readers directly read from the file. + */ + public static BoundedBlockingSubpartition createWithFileChannel( + int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException { + + final FileChannelBoundedData bd = FileChannelBoundedData.create(tempFile.toPath(), readBufferSize); + return new BoundedBlockingSubpartition(index, parent, bd); + } + + /** + * Creates a BoundedBlockingSubpartition that stores the partition data in memory mapped file. + * Data is written to and read from the mapped memory region. Disk spilling happens lazily, when the + * OS swaps out the pages from the memory mapped file. + */ + public static BoundedBlockingSubpartition createWithMemoryMappedFile( + int index, ResultPartition parent, File tempFile) throws IOException { + + final MemoryMappedBoundedData bd = MemoryMappedBoundedData.create(tempFile.toPath()); + return new BoundedBlockingSubpartition(index, parent, bd); + + } + + /** + * Creates a BoundedBlockingSubpartition that stores the partition data in a file and + * memory maps that file for reading. + * Data is eagerly spilled (written to disk) and then mapped into memory. The main + * difference to the {@link #createWithMemoryMappedFile(int, ResultPartition, File)} variant + * is that no I/O is necessary when pages from the memory mapped file are evicted. + */ + public static BoundedBlockingSubpartition createWithFileAndMemoryMappedReader( + int index, ResultPartition parent, File tempFile) throws IOException { + + final FileChannelMemoryMappedBoundedData bd = FileChannelMemoryMappedBoundedData.create(tempFile.toPath()); + return new BoundedBlockingSubpartition(index, parent, bd); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java old mode 100644 new mode 100755 index 07a999d..f7536b9 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.util.IOUtils; import javax.annotation.Nullable; @@ -44,7 +44,7 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView /** The reader/decoder to the memory mapped region with the data we currently read from. * Null once the reader empty or disposed.*/ @Nullable - private BufferSlicer data; + private BoundedData.Reader dataReader; /** The remaining number of data buffers (not events) in the result. */ private int dataBufferBacklog; @@ -57,16 +57,16 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView */ BoundedBlockingSubpartitionReader( BoundedBlockingSubpartition parent, - BufferSlicer data, - int numDataBuffers) { + BoundedData.Reader dataReader, + int numDataBuffers) throws IOException { checkArgument(numDataBuffers >= 0); this.parent = checkNotNull(parent); - this.data = checkNotNull(data); + this.dataReader = checkNotNull(dataReader); this.dataBufferBacklog = numDataBuffers; - this.nextBuffer = data.sliceNextBuffer(); + this.nextBuffer = dataReader.nextBuffer(); } @Nullable @@ -83,8 +83,8 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView dataBufferBacklog--; } - assert data != null; - nextBuffer = data.sliceNextBuffer(); + assert dataReader != null; + nextBuffer = dataReader.nextBuffer(); return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog); } @@ -104,9 +104,11 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView // it is not a problem if this method executes multiple times isReleased = true; - // nulling these fields means thet read method and will fail fast + IOUtils.closeQuietly(dataReader); + + // nulling these fields means the read method and will fail fast nextBuffer = null; - data = null; + dataReader = null; // Notify the parent that this one is released. This allows the parent to // eventually release all resources (when all readers are done and the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java new file mode 100644 index 0000000..9b43264 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.File; +import java.io.IOException; + +/** + * The type of the BoundedBlockingSubpartition. Also doubles as the factory. + */ +public enum BoundedBlockingSubpartitionType { + + /** + * A BoundedBlockingSubpartition type that simply stores the partition data in a file. + * Data is eagerly spilled (written to disk) and readers directly read from the file. + */ + FILE { + + @Override + public BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException { + return BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, readBufferSize); + } + }, + + /** + * A BoundedBlockingSubpartition type that stores the partition data in memory mapped file. + * Data is written to and read from the mapped memory region. + * Disk spilling happens lazily, when the OS swaps out the pages from the memory mapped file. + */ + MMAP { + + @Override + public BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException { + return BoundedBlockingSubpartition.createWithMemoryMappedFile(index, parent, tempFile); + } + }, + + /** + * Creates a BoundedBlockingSubpartition that stores the partition data in a file and + * memory maps that file for reading. + * Data is eagerly spilled (written to disk) and then mapped into memory. The main + * difference to the {@link BoundedBlockingSubpartitionType#MMAP} variant + * is that no I/O is necessary when pages from the memory mapped file are evicted. + */ + FILE_MMAP { + + @Override + public BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException { + return BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(index, parent, tempFile); + } + }; + + // ------------------------------------------------------------------------ + + /** + * Creates BoundedBlockingSubpartition of this type. + */ + public abstract BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java new file mode 100755 index 0000000..4d58cf8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; + +/** + * BoundedData is the data store in a single bounded blocking subpartition. + * + * <h2>Life cycle</h2> + * + * <p>The BoundedData is first created during the "write phase" by writing a sequence of buffers + * through the {@link #writeBuffer(Buffer)} method. + * The write phase is ended by calling {@link #finishWrite()}. + * After the write phase is finished, the data can be read multiple times through readers created + * via {@link #createReader()}. + * Finally, the BoundedData is dropped / deleted by calling {@link #close()}. + * + * <h2>Thread Safety and Concurrency</h2> + * + * <p>The implementations generally make no assumptions about thread safety. + * The only contract is that multiple created readers must be able to work independently concurrently. + */ +interface BoundedData extends Closeable { + + /** + * Writes this buffer to the bounded data. + * This call fails if the writing phase was already finished via {@link #finishWrite()}. + */ + void writeBuffer(Buffer buffer) throws IOException; + + /** + * Finishes the current region and prevents further writes. + * After calling this method, further calls to {@link #writeBuffer(Buffer)} will fail. + */ + void finishWrite() throws IOException; + + /** + * Gets a reader for the bounded data. Multiple readers may be created. + * This call only succeeds once the write phase was finished via {@link #finishWrite()}. + */ + BoundedData.Reader createReader() throws IOException; + + /** + * Gets the number of bytes of all written data (including the metadata in the buffer headers). + */ + long getSize(); + + // ------------------------------------------------------------------------ + + /** + * A reader to the bounded data. + */ + interface Reader extends Closeable { + + @Nullable + Buffer nextBuffer() throws IOException; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java new file mode 100755 index 0000000..445df55 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + +/** + * Putting and getting of a sequence of buffers to/from a FileChannel or a ByteBuffer. + * This class handles the headers, length encoding, memory slicing. + * + * <p>The encoding is the same across FileChannel and ByteBuffer, so this class can + * write to a file and read from the byte buffer that results from mapping this file to memory. + */ +final class BufferReaderWriterUtil { + + static final int HEADER_LENGTH = 8; + + static final int HEADER_VALUE_IS_BUFFER = 0; + + static final int HEADER_VALUE_IS_EVENT = 1; + + // ------------------------------------------------------------------------ + // ByteBuffer read / write + // ------------------------------------------------------------------------ + + static boolean writeBuffer(Buffer buffer, ByteBuffer memory) { + final int bufferSize = buffer.getSize(); + + if (memory.remaining() < bufferSize + HEADER_LENGTH) { + return false; + } + + memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + memory.putInt(bufferSize); + memory.put(buffer.getNioBufferReadable()); + return true; + } + + @Nullable + static Buffer sliceNextBuffer(ByteBuffer memory) { + final int remaining = memory.remaining(); + + // we only check the correct case where data is exhausted + // all other cases can only occur if our write logic is wrong and will already throw + // buffer underflow exceptions which will cause the read to fail. + if (remaining == 0) { + return null; + } + + final int header = memory.getInt(); + final int size = memory.getInt(); + + memory.limit(memory.position() + size); + ByteBuffer buf = memory.slice(); + memory.position(memory.limit()); + memory.limit(memory.capacity()); + + MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf); + + return bufferFromMemorySegment( + memorySegment, + FreeingBufferRecycler.INSTANCE, + size, + header == HEADER_VALUE_IS_EVENT); + } + + // ------------------------------------------------------------------------ + // ByteChannel read / write + // ------------------------------------------------------------------------ + + static long writeToByteChannel( + FileChannel channel, + Buffer buffer, + ByteBuffer[] arrayWithHeaderBuffer) throws IOException { + + final ByteBuffer headerBuffer = arrayWithHeaderBuffer[0]; + headerBuffer.clear(); + headerBuffer.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); + headerBuffer.putInt(buffer.getSize()); + headerBuffer.flip(); + + final ByteBuffer dataBuffer = buffer.getNioBufferReadable(); + arrayWithHeaderBuffer[1] = dataBuffer; + + final long bytesExpected = HEADER_LENGTH + dataBuffer.remaining(); + + // The file channel implementation guarantees that all bytes are written when invoked + // because it is a blocking channel (the implementation mentioned it as guaranteed). + // However, the api docs leaves it somewhat open, so it seems to be an undocumented contract in the JRE. + // We build this safety net to be on the safe side. + if (bytesExpected < channel.write(arrayWithHeaderBuffer)) { + writeBuffers(channel, arrayWithHeaderBuffer); + } + return bytesExpected; + } + + static long writeToByteChannelIfBelowSize( + FileChannel channel, + Buffer buffer, + ByteBuffer[] arrayWithHeaderBuffer, + long bytesLeft) throws IOException { + + if (bytesLeft >= HEADER_LENGTH + buffer.getSize()) { + return writeToByteChannel(channel, buffer, arrayWithHeaderBuffer); + } + + return -1L; + } + + @Nullable + static Buffer readFromByteChannel( + FileChannel channel, + ByteBuffer headerBuffer, + MemorySegment memorySegment, + BufferRecycler bufferRecycler) throws IOException { + + headerBuffer.clear(); + if (!tryReadByteBuffer(channel, headerBuffer)) { + return null; + } + headerBuffer.flip(); + + final ByteBuffer targetBuf; + final int header; + final int size; + + try { + header = headerBuffer.getInt(); + size = headerBuffer.getInt(); + targetBuf = memorySegment.wrap(0, size); + } + catch (BufferUnderflowException | IllegalArgumentException e) { + // buffer underflow if header buffer is undersized + // IllegalArgumentException if size is outside memory segment size + throwCorruptDataException(); + return null; // silence compiler + } + + readByteBufferFully(channel, targetBuf); + + return bufferFromMemorySegment(memorySegment, bufferRecycler, size, header == HEADER_VALUE_IS_EVENT); + } + + static ByteBuffer allocatedHeaderBuffer() { + ByteBuffer bb = ByteBuffer.allocateDirect(HEADER_LENGTH); + configureByteBuffer(bb); + return bb; + } + + static ByteBuffer[] allocatedWriteBufferArray() { + return new ByteBuffer[] { allocatedHeaderBuffer(), null }; + } + + private static boolean tryReadByteBuffer(FileChannel channel, ByteBuffer b) throws IOException { + if (channel.read(b) == -1) { + return false; + } + else { + while (b.hasRemaining()) { + if (channel.read(b) == -1) { + throwPrematureEndOfFile(); + } + } + return true; + } + } + + private static void readByteBufferFully(FileChannel channel, ByteBuffer b) throws IOException { + // the post-checked loop here gets away with one less check in the normal case + do { + if (channel.read(b) == -1) { + throwPrematureEndOfFile(); + } + } + while (b.hasRemaining()); + } + + private static void writeBuffer(FileChannel channel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } + + private static void writeBuffers(FileChannel channel, ByteBuffer... buffers) throws IOException { + for (ByteBuffer buffer : buffers) { + writeBuffer(channel, buffer); + } + } + + private static void throwPrematureEndOfFile() throws IOException { + throw new IOException("The spill file is corrupt: premature end of file"); + } + + private static void throwCorruptDataException() throws IOException { + throw new IOException("The spill file is corrupt: buffer size and boundaries invalid"); + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + static Buffer bufferFromMemorySegment( + MemorySegment memorySegment, + BufferRecycler memorySegmentRecycler, + int size, + boolean isEvent) { + + final Buffer buffer = new NetworkBuffer(memorySegment, memorySegmentRecycler); + buffer.setSize(size); + + if (isEvent) { + buffer.tagAsEvent(); + } + + return buffer; + } + + static void configureByteBuffer(ByteBuffer buffer) { + buffer.order(ByteOrder.nativeOrder()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java deleted file mode 100644 index 4cdf41a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferToByteBuffer.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; - -import javax.annotation.Nullable; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * Putting and getting of a sequence of buffers to/from a ByteBuffer. - * This class handles the headers, length encoding, memory slicing. - */ -final class BufferToByteBuffer { - - // all fields and methods below here have package-private access to avoid bridge - // methods when accessing them from the nested classes - - static final int HEADER_LENGTH = 8; - - static final int HEADER_VALUE_IS_BUFFER = 0; - - static final int HEADER_VALUE_IS_EVENT = 1; - - static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) { - checkArgument(buffer.position() == 0); - checkArgument(buffer.capacity() > 8); - checkArgument(buffer.limit() == buffer.capacity()); - - return buffer.order(ByteOrder.nativeOrder()); - } - - // ------------------------------------------------------------------------ - - static final class Writer { - - private final ByteBuffer memory; - - Writer(ByteBuffer memory) { - this.memory = checkAndConfigureByteBuffer(memory); - } - - public boolean writeBuffer(Buffer buffer) { - final ByteBuffer memory = this.memory; - final int bufferSize = buffer.getSize(); - - if (memory.remaining() < bufferSize + HEADER_LENGTH) { - return false; - } - - memory.putInt(buffer.isBuffer() ? HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT); - memory.putInt(bufferSize); - memory.put(buffer.getNioBufferReadable()); - return true; - } - - public ByteBuffer complete() { - memory.flip(); - return memory; - } - - public int getNumBytes() { - return memory.position(); - } - } - - static final class Reader { - - private final ByteBuffer memory; - - Reader(ByteBuffer memory) { - this.memory = checkAndConfigureByteBuffer(memory); - } - - @Nullable - public Buffer sliceNextBuffer() { - final ByteBuffer memory = this.memory; - final int remaining = memory.remaining(); - - // we only check the correct case where data is exhausted - // all other cases can only occur if our write logic is wrong and will already throw - // buffer underflow exceptions which will cause the read to fail. - if (remaining == 0) { - return null; - } - - final int header = memory.getInt(); - final int size = memory.getInt(); - - memory.limit(memory.position() + size); - ByteBuffer buf = memory.slice(); - memory.position(memory.limit()); - memory.limit(memory.capacity()); - - MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf); - Buffer buffer = new NetworkBuffer(memorySegment, FreeingBufferRecycler.INSTANCE); - buffer.setSize(size); - - if (header == HEADER_VALUE_IS_EVENT) { - buffer.tagAsEvent(); - } - - return buffer; - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java new file mode 100644 index 0000000..50dca60 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayDeque; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An implementation of {@link BoundedData} that writes directly into a File Channel. + * The readers are simple file channel readers using a simple dedicated buffer pool. + */ +final class FileChannelBoundedData implements BoundedData { + + private final Path filePath; + + private final FileChannel fileChannel; + + private final ByteBuffer[] headerAndBufferArray; + + private long size; + + private final int memorySegmentSize; + + FileChannelBoundedData( + Path filePath, + FileChannel fileChannel, + int memorySegmentSize) { + + this.filePath = checkNotNull(filePath); + this.fileChannel = checkNotNull(fileChannel); + this.memorySegmentSize = memorySegmentSize; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + size += BufferReaderWriterUtil.writeToByteChannel(fileChannel, buffer, headerAndBufferArray); + } + + @Override + public void finishWrite() throws IOException { + fileChannel.close(); + } + + @Override + public Reader createReader() throws IOException { + checkState(!fileChannel.isOpen()); + + final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ); + return new FileBufferReader(fc, memorySegmentSize); + } + + @Override + public long getSize() { + return size; + } + + @Override + public void close() throws IOException { + IOUtils.closeQuietly(fileChannel); + Files.delete(filePath); + } + + // ------------------------------------------------------------------------ + + public static FileChannelBoundedData create(Path filePath, int memorySegmentSize) throws IOException { + final FileChannel fileChannel = FileChannel.open( + filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + return new FileChannelBoundedData( + filePath, + fileChannel, + memorySegmentSize); + } + + // ------------------------------------------------------------------------ + + static final class FileBufferReader implements BoundedData.Reader, BufferRecycler { + + private static final int NUM_BUFFERS = 2; + + private final FileChannel fileChannel; + + private final ByteBuffer headerBuffer; + + private final ArrayDeque<MemorySegment> buffers; + + FileBufferReader(FileChannel fileChannel, int bufferSize) { + this.fileChannel = checkNotNull(fileChannel); + this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); + this.buffers = new ArrayDeque<>(NUM_BUFFERS); + + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null)); + } + } + + @Nullable + @Override + public Buffer nextBuffer() throws IOException { + final MemorySegment memory = buffers.pollFirst(); + + if (memory != null) { + final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this); + if (next != null) { + return next; + } + else { + recycle(memory); + return null; + } + } + + throw new IOException("Bug in BoundedBlockingSubpartition with FILE data: " + + "Requesting new buffer before previous buffer returned."); + } + + @Override + public void close() throws IOException { + fileChannel.close(); + } + + @Override + public void recycle(MemorySegment memorySegment) { + buffers.addLast(memorySegment); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java new file mode 100755 index 0000000..4a71fcd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.IOUtils; + +import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An implementation of {@link BoundedData} that writes directly into a File Channel + * and maps the file into memory after writing. Readers simply access the memory mapped + * data. All readers access the same memory, which is mapped in a read-only manner. + * + * <p>Similarly as the {@link MemoryMappedBoundedData}, this implementation needs to work around + * the fact that the memory mapped regions cannot exceed 2GB in Java. While the implementation writes + * to the a single file, the result may be multiple memory mapped buffers. + * + * <h2>Important!</h2> + * + * <p>This class performs absolutely no synchronization and relies on single threaded access + * or externally synchronized access. Concurrent access around disposal may cause + * segmentation faults! + */ +final class FileChannelMemoryMappedBoundedData implements BoundedData { + + /** The file channel backing the memory mapped file. */ + private final FileChannel fileChannel; + + /** The reusable array with header buffer and data buffer, to use gathering writes on the + * file channel ({@link java.nio.channels.GatheringByteChannel#write(ByteBuffer[])}). */ + private final ByteBuffer[] headerAndBufferArray; + + /** All memory mapped regions. */ + private final ArrayList<ByteBuffer> memoryMappedRegions; + + /** The path of the memory mapped file. */ + private final Path filePath; + + /** The position in the file channel. Cached for efficiency, because an actual position + * lookup in the channel involves various locks and checks. */ + private long pos; + + /** The position where the current memory mapped region must end. */ + private long endOfCurrentRegion; + + /** The position where the current memory mapped started. */ + private long startOfCurrentRegion; + + /** The maximum size of each mapped region. */ + private final long maxRegionSize; + + FileChannelMemoryMappedBoundedData( + Path filePath, + FileChannel fileChannel, + int maxSizePerMappedRegion) { + + this.filePath = filePath; + this.fileChannel = fileChannel; + this.headerAndBufferArray = BufferReaderWriterUtil.allocatedWriteBufferArray(); + this.memoryMappedRegions = new ArrayList<>(4); + this.maxRegionSize = maxSizePerMappedRegion; + this.endOfCurrentRegion = maxSizePerMappedRegion; + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + if (tryWriteBuffer(buffer)) { + return; + } + + mapRegionAndStartNext(); + + if (!tryWriteBuffer(buffer)) { + throwTooLargeBuffer(buffer); + } + } + + private boolean tryWriteBuffer(Buffer buffer) throws IOException { + final long spaceLeft = endOfCurrentRegion - pos; + final long bytesWritten = BufferReaderWriterUtil.writeToByteChannelIfBelowSize( + fileChannel, buffer, headerAndBufferArray, spaceLeft); + + if (bytesWritten >= 0) { + pos += bytesWritten; + return true; + } + else { + return false; + } + } + + @Override + public BoundedData.Reader createReader() { + checkState(!fileChannel.isOpen()); + + final List<ByteBuffer> buffers = memoryMappedRegions.stream() + .map((bb) -> bb.duplicate().order(ByteOrder.nativeOrder())) + .collect(Collectors.toList()); + + return new MemoryMappedBoundedData.BufferSlicer(buffers); + } + + /** + * Finishes the current region and prevents further writes. + * After calling this method, further calls to {@link #writeBuffer(Buffer)} will fail. + */ + @Override + public void finishWrite() throws IOException { + mapRegionAndStartNext(); + fileChannel.close(); + } + + /** + * Closes the file and unmaps all memory mapped regions. + * After calling this method, access to any ByteBuffer obtained from this instance + * will cause a segmentation fault. + */ + public void close() throws IOException { + IOUtils.closeQuietly(fileChannel); + + for (ByteBuffer bb : memoryMappedRegions) { + PlatformDependent.freeDirectBuffer(bb); + } + memoryMappedRegions.clear(); + + // To make this compatible with all versions of Windows, we must wait with + // deleting the file until it is unmapped. + // See also https://stackoverflow.com/questions/11099295/file-flag-delete-on-close-and-memory-mapped-files/51649618#51649618 + + Files.delete(filePath); + } + + @Override + public long getSize() { + return pos; + } + + private void mapRegionAndStartNext() throws IOException { + final ByteBuffer region = fileChannel.map(MapMode.READ_ONLY, startOfCurrentRegion, pos - startOfCurrentRegion); + region.order(ByteOrder.nativeOrder()); + memoryMappedRegions.add(region); + + startOfCurrentRegion = pos; + endOfCurrentRegion = startOfCurrentRegion + maxRegionSize; + } + + private void throwTooLargeBuffer(Buffer buffer) throws IOException { + throw new IOException(String.format( + "The buffer (%d bytes) is larger than the maximum size of a memory buffer (%d bytes)", + buffer.getSize(), maxRegionSize)); + } + + // ------------------------------------------------------------------------ + // Factories + // ------------------------------------------------------------------------ + + /** + * Creates new FileChannelMemoryMappedBoundedData, creating a memory mapped file at the given path. + */ + public static FileChannelMemoryMappedBoundedData create(Path memMappedFilePath) throws IOException { + return createWithRegionSize(memMappedFilePath, Integer.MAX_VALUE); + } + + /** + * Creates new FileChannelMemoryMappedBoundedData, creating a memory mapped file at the given path. + * Each mapped region (= ByteBuffer) will be of the given size. + */ + public static FileChannelMemoryMappedBoundedData createWithRegionSize(Path memMappedFilePath, int regionSize) throws IOException { + checkNotNull(memMappedFilePath, "memMappedFilePath"); + checkArgument(regionSize > 0, "regions size most be > 0"); + + final FileChannel fileChannel = FileChannel.open(memMappedFilePath, + StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); + + return new FileChannelMemoryMappedBoundedData(memMappedFilePath, fileChannel, regionSize); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java old mode 100644 new mode 100755 similarity index 73% rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java index f2973b8..502c64c --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java @@ -19,16 +19,15 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer; import org.apache.flink.util.IOUtils; import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.file.Files; @@ -42,24 +41,14 @@ import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; /** - * This class is largely a workaround for the fact that a memory mapped region in Java can cannot + * An implementation of {@link BoundedData} simply through ByteBuffers backed by memory, typically + * from a memory mapped file, so the data gets automatically evicted from memory if it grows large. + * + * <p>Most of the code in this class is a workaround for the fact that a memory mapped region in Java cannot * be larger than 2GB (== signed 32 bit int max value). The class takes {@link Buffer Buffers} and - * writes them to several memory mapped region, using the {@link BufferToByteBuffer} + * writes them to several memory mapped region, using the {@link BufferReaderWriterUtil} * class. * - * <h2>Usage</h2> - * - * <p>The class assumes in the first phase that data is written by repeatedly calling - * {@link #writeBuffer(Buffer)}. That puts the data into the memory region of the memory - * mapped file. After writing, one must call {@link #finishWrite()}. - * - * <p>After that, the class can produce multiple {@link BufferSlicer} instances to re-read - * the data from the memory regions. Multiple slicers can read concurrently, but each slicer - * should be read from by a single thread. - * - * <p>Eventually, the resources must be disposed via {@link #close()}. After that, - * no reading can happen any more. - * * <h2>Important!</h2> * * <p>This class performs absolutely no synchronization and relies on single threaded access @@ -67,18 +56,18 @@ import static org.apache.flink.util.Preconditions.checkArgument; * segmentation faults! * * <p>This class does limited sanity checks and assumes correct use from {@link BoundedBlockingSubpartition} - * and {@link BoundedBlockingSubpartitionReader}, such as writing first and rading after. + * and {@link BoundedBlockingSubpartitionReader}, such as writing first and reading after. * Not obeying these contracts throws NullPointerExceptions. */ -class MemoryMappedBuffers implements Closeable { +final class MemoryMappedBoundedData implements BoundedData { /** Memory mappings should be at the granularity of page sizes, for efficiency. */ private static final int PAGE_SIZE = PageSizeUtil.getSystemPageSizeOrConservativeMultiple(); - /** The encoder to the current memory mapped region we are writing to. + /** The the current memory mapped region we are writing to. * This value is null once writing has finished or the buffers are disposed. */ @Nullable - private BufferToByteBuffer.Writer currentBuffer; + private ByteBuffer currentBuffer; /** All memory mapped regions that are already full (completed). */ private final ArrayList<ByteBuffer> fullBuffers; @@ -95,7 +84,7 @@ class MemoryMappedBuffers implements Closeable { /** The size of each mapped region. */ private final long mappingSize; - MemoryMappedBuffers( + MemoryMappedBoundedData( Path filePath, FileChannel fileChannel, int maxSizePerByteBuffer) throws IOException { @@ -108,25 +97,27 @@ class MemoryMappedBuffers implements Closeable { rollOverToNextBuffer(); } - void writeBuffer(Buffer buffer) throws IOException { + @Override + public void writeBuffer(Buffer buffer) throws IOException { assert currentBuffer != null; - if (currentBuffer.writeBuffer(buffer)) { + if (BufferReaderWriterUtil.writeBuffer(buffer, currentBuffer)) { return; } rollOverToNextBuffer(); - if (!currentBuffer.writeBuffer(buffer)) { + if (!BufferReaderWriterUtil.writeBuffer(buffer, currentBuffer)) { throwTooLargeBuffer(buffer); } } - BufferSlicer getFullBuffers() { + @Override + public BufferSlicer createReader() { assert currentBuffer == null; final List<ByteBuffer> buffers = fullBuffers.stream() - .map(ByteBuffer::slice) + .map((bb) -> bb.slice().order(ByteOrder.nativeOrder())) .collect(Collectors.toList()); return new BufferSlicer(buffers); @@ -136,10 +127,12 @@ class MemoryMappedBuffers implements Closeable { * Finishes the current region and prevents further writes. * After calling this method, further calls to {@link #writeBuffer(Buffer)} will fail. */ - void finishWrite() throws IOException { + @Override + public void finishWrite() throws IOException { assert currentBuffer != null; - fullBuffers.add(currentBuffer.complete()); + currentBuffer.flip(); + fullBuffers.add(currentBuffer); currentBuffer = null; // fail further writes fast file.close(); // won't map further regions from now on } @@ -158,7 +151,7 @@ class MemoryMappedBuffers implements Closeable { fullBuffers.clear(); if (currentBuffer != null) { - PlatformDependent.freeDirectBuffer(currentBuffer.complete()); + PlatformDependent.freeDirectBuffer(currentBuffer); currentBuffer = null; } @@ -172,13 +165,14 @@ class MemoryMappedBuffers implements Closeable { /** * Gets the number of bytes of all written data (including the metadata in the buffer headers). */ - long getSize() { + @Override + public long getSize() { long size = 0L; for (ByteBuffer bb : fullBuffers) { size += bb.remaining(); } if (currentBuffer != null) { - size += currentBuffer.getNumBytes(); + size += currentBuffer.position(); } return size; } @@ -187,11 +181,12 @@ class MemoryMappedBuffers implements Closeable { if (currentBuffer != null) { // we need to remember the original buffers, not any slices. // slices have no cleaner, which we need to trigger explicit unmapping - fullBuffers.add(currentBuffer.complete()); + currentBuffer.flip(); + fullBuffers.add(currentBuffer); } - final ByteBuffer mapped = file.map(MapMode.READ_WRITE, nextMappingOffset, mappingSize); - currentBuffer = new Writer(mapped); + currentBuffer = file.map(MapMode.READ_WRITE, nextMappingOffset, mappingSize); + currentBuffer.order(ByteOrder.nativeOrder()); nextMappingOffset += mappingSize; } @@ -220,11 +215,11 @@ class MemoryMappedBuffers implements Closeable { * The "reader" for the memory region. It slices a sequence of buffers from the * sequence of mapped ByteBuffers. */ - static final class BufferSlicer { + static final class BufferSlicer implements BoundedData.Reader { - /** The reader/decoder to the memory mapped region with the data we currently read from. + /** The memory mapped region we currently read from. * Max 2GB large. Further regions may be in the {@link #furtherData} field. */ - private BufferToByteBuffer.Reader data; + private ByteBuffer currentData; /** Further byte buffers, to handle cases where there is more data than fits into * one mapped byte buffer (2GB = Integer.MAX_VALUE). */ @@ -232,16 +227,17 @@ class MemoryMappedBuffers implements Closeable { BufferSlicer(Iterable<ByteBuffer> data) { this.furtherData = data.iterator(); - this.data = new BufferToByteBuffer.Reader(furtherData.next()); + this.currentData = furtherData.next(); } + @Override @Nullable - public Buffer sliceNextBuffer() { + public Buffer nextBuffer() { // should only be null once empty or disposed, in which case this method // should not be called any more - assert data != null; + assert currentData != null; - final Buffer next = data.sliceNextBuffer(); + final Buffer next = BufferReaderWriterUtil.sliceNextBuffer(currentData); if (next != null) { return next; } @@ -250,8 +246,14 @@ class MemoryMappedBuffers implements Closeable { return null; } - data = new BufferToByteBuffer.Reader(furtherData.next()); - return sliceNextBuffer(); + currentData = furtherData.next(); + return nextBuffer(); + } + + @Override + public void close() throws IOException { + // nothing to do, this class holds no actual resources of its own, + // only references to the mapped byte buffers } } @@ -260,20 +262,20 @@ class MemoryMappedBuffers implements Closeable { // ------------------------------------------------------------------------ /** - * Creates new MemoryMappedBuffers, creating a memory mapped file at the given path. + * Creates new MemoryMappedBoundedData, creating a memory mapped file at the given path. */ - public static MemoryMappedBuffers create(Path memMappedFilePath) throws IOException { + public static MemoryMappedBoundedData create(Path memMappedFilePath) throws IOException { return createWithRegionSize(memMappedFilePath, Integer.MAX_VALUE); } /** - * Creates new MemoryMappedBuffers, creating a memory mapped file at the given path. + * Creates new MemoryMappedBoundedData, creating a memory mapped file at the given path. * Each mapped region (= ByteBuffer) will be of the given size. */ - public static MemoryMappedBuffers createWithRegionSize(Path memMappedFilePath, int regionSize) throws IOException { + public static MemoryMappedBoundedData createWithRegionSize(Path memMappedFilePath, int regionSize) throws IOException { final FileChannel fileChannel = FileChannel.open(memMappedFilePath, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); - return new MemoryMappedBuffers(memMappedFilePath, fileChannel, regionSize); + return new MemoryMappedBoundedData(memMappedFilePath, fileChannel, regionSize); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java old mode 100644 new mode 100755 index 6fc5cfc..137ea5f --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.MemoryArchitecture; import org.apache.flink.util.function.FunctionWithException; import org.slf4j.Logger; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.File; import java.io.IOException; import java.util.Optional; @@ -41,8 +43,11 @@ import java.util.Optional; * Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}. */ public class ResultPartitionFactory { + private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class); + private static final BoundedBlockingSubpartitionType BOUNDED_BLOCKING_TYPE = getBoundedBlockingType(); + @Nonnull private final ResultPartitionManager partitionManager; @@ -114,7 +119,7 @@ public class ResultPartitionFactory { partitionManager, bufferPoolFactory); - createSubpartitions(partition, type, subpartitions); + createSubpartitions(partition, type, subpartitions, this.bufferPoolFactory.getBufferSize()); LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this); @@ -122,12 +127,15 @@ public class ResultPartitionFactory { } private void createSubpartitions( - ResultPartition partition, ResultPartitionType type, ResultSubpartition[] subpartitions) { + ResultPartition partition, + ResultPartitionType type, + ResultSubpartition[] subpartitions, + int networkBufferSize) { // Create the subpartitions. switch (type) { case BLOCKING: - initializeBoundedBlockingPartitions(subpartitions, partition, ioManager); + initializeBoundedBlockingPartitions(subpartitions, partition, ioManager, networkBufferSize); break; case PIPELINED: @@ -146,13 +154,14 @@ public class ResultPartitionFactory { private static void initializeBoundedBlockingPartitions( ResultSubpartition[] subpartitions, ResultPartition parent, - IOManager ioManager) { + IOManager ioManager, + int networkBufferSize) { int i = 0; try { for (; i < subpartitions.length; i++) { - subpartitions[i] = new BoundedBlockingSubpartition( - i, parent, ioManager.createChannel().getPathFile().toPath()); + final File spillFile = ioManager.createChannel().getPathFile(); + subpartitions[i] = BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize); } } catch (IOException e) { @@ -188,4 +197,16 @@ public class ResultPartitionFactory { type.hasBackPressure() ? Optional.empty() : Optional.of(p)); }; } + + private static BoundedBlockingSubpartitionType getBoundedBlockingType() { + switch (MemoryArchitecture.get()) { + case _64_BIT: + return BoundedBlockingSubpartitionType.FILE_MMAP; + case _32_BIT: + return BoundedBlockingSubpartitionType.FILE; + default: + LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle."); + return BoundedBlockingSubpartitionType.FILE; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java index 5051b90..e76cd1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java @@ -24,12 +24,12 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -59,12 +59,24 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { partition.release(); } + @Test + public void testClosingClosesBoundedData() throws Exception { + final TestingBoundedDataReader reader = new TestingBoundedDataReader(); + final BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader( + (BoundedBlockingSubpartition) createSubpartition(), reader, 10); + + bbspr.releaseAllResources(); + + assertTrue(reader.closed); + } + // ------------------------------------------------------------------------ @Override ResultSubpartition createSubpartition() throws Exception { final ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING); - return new BoundedBlockingSubpartition(0, resultPartition, tmpPath()); + return BoundedBlockingSubpartition.createWithMemoryMappedFile( + 0, resultPartition, new File(TMP_DIR.newFolder(), "subpartition")); } @Override @@ -74,32 +86,50 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { return new BoundedBlockingSubpartition( 0, resultPartition, - FailingMemory.create()); + new FailingBoundedData()); } // ------------------------------------------------------------------------ - static Path tmpPath() throws IOException { - return new File(TMP_DIR.newFolder(), "subpartition").toPath(); - } + private static class FailingBoundedData implements BoundedData { - // ------------------------------------------------------------------------ + @Override + public void writeBuffer(Buffer buffer) throws IOException { + throw new IOException("test"); + } + + @Override + public void finishWrite() throws IOException { + throw new UnsupportedOperationException(); + } - private static class FailingMemory extends MemoryMappedBuffers { + @Override + public Reader createReader() throws IOException { + throw new UnsupportedOperationException(); + } - FailingMemory(Path path, FileChannel fc) throws IOException { - super(path, fc, Integer.MAX_VALUE); + @Override + public long getSize() { + throw new UnsupportedOperationException(); } @Override - void writeBuffer(Buffer buffer) throws IOException { - throw new IOException("test"); + public void close() {} + } + + private static class TestingBoundedDataReader implements BoundedData.Reader { + + boolean closed; + + @Nullable + @Override + public Buffer nextBuffer() throws IOException { + return null; } - static FailingMemory create() throws IOException { - Path p = tmpPath(); - FileChannel fc = FileChannel.open(p, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); - return new FailingMemory(p, fc); + @Override + public void close() throws IOException { + closed = true; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java index 46dbb05..359ad8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java @@ -27,10 +27,17 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAn import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -38,11 +45,32 @@ import static org.junit.Assert.assertTrue; /** * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. */ +@RunWith(Parameterized.class) public class BoundedBlockingSubpartitionWriteReadTest { @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + private static final int BUFFER_SIZE = 1024 * 1024; + + // ------------------------------------------------------------------------ + // parameters + // ------------------------------------------------------------------------ + + @Parameters(name = "type = {0}") + public static Collection<Object[]> modes() { + return Arrays.stream(BoundedBlockingSubpartitionType.values()) + .map((type) -> new Object[] { type }) + .collect(Collectors.toList()); + } + + @Parameter + public BoundedBlockingSubpartitionType type; + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + @Test public void testWriteAndReadData() throws Exception { final int numLongs = 15_000_000; // roughly 115 MiBytes @@ -118,6 +146,7 @@ public class BoundedBlockingSubpartitionWriteReadTest { assertEquals(expectedNextLong++, buffer.getLong()); } + next.buffer().recycleBuffer(); nextExpectedBacklog--; } @@ -130,7 +159,7 @@ public class BoundedBlockingSubpartitionWriteReadTest { // ------------------------------------------------------------------------ private static void writeLongs(BoundedBlockingSubpartition partition, long nums) throws IOException { - final MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(1024 * 1024); + final MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE); long l = 0; while (nums > 0) { @@ -149,18 +178,19 @@ public class BoundedBlockingSubpartitionWriteReadTest { } } - private static BoundedBlockingSubpartition createAndFillPartition(long numLongs) throws IOException { + private BoundedBlockingSubpartition createAndFillPartition(long numLongs) throws IOException { BoundedBlockingSubpartition subpartition = createSubpartition(); writeLongs(subpartition, numLongs); subpartition.finish(); return subpartition; } - private static BoundedBlockingSubpartition createSubpartition() throws IOException { - return new BoundedBlockingSubpartition( + private BoundedBlockingSubpartition createSubpartition() throws IOException { + return type.create( 0, PartitionTestUtils.createPartition(ResultPartitionType.BLOCKING), - new File(TMP_FOLDER.newFolder(), "partitiondata").toPath()); + new File(TMP_FOLDER.newFolder(), "partitiondata"), + BUFFER_SIZE); } private static LongReader[] createSubpartitionLongReaders( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java similarity index 53% rename from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java index eec7dba..bd7421c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBuffersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.partition.MemoryMappedBuffers.BufferSlicer; import org.hamcrest.Matchers; import org.junit.ClassRule; @@ -34,7 +33,6 @@ import java.nio.file.Path; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -42,114 +40,141 @@ import static org.junit.Assert.assertTrue; /** * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. */ -public class MemoryMappedBuffersTest { +public abstract class BoundedDataTestBase { @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + /** Max buffer sized used by the tests that write data. For implementations that need + * to instantiate buffers in the read side. */ + protected static final int BUFFER_SIZE = 1024 * 1024; // 1 MiByte + + // ------------------------------------------------------------------------ + // BoundedData Instantiation + // ------------------------------------------------------------------------ + + protected abstract BoundedData createBoundedData(Path tempFilePath) throws IOException; + + protected abstract BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException; + + private BoundedData createBoundedData() throws IOException { + return createBoundedData(createTempPath()); + } + + private BoundedData createBoundedDataWithRegion(int regionSize) throws IOException { + return createBoundedDataWithRegion(createTempPath(), regionSize); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + @Test public void testWriteAndReadData() throws Exception { - testWriteAndReadData(10_000_000, Integer.MAX_VALUE); + try (BoundedData bd = createBoundedData()) { + testWriteAndReadData(bd); + } } @Test public void testWriteAndReadDataAcrossRegions() throws Exception { - testWriteAndReadData(10_000_000, 1_276_347); + try (BoundedData bd = createBoundedDataWithRegion(1_276_347)) { + testWriteAndReadData(bd); + } } - private static void testWriteAndReadData(int numInts, int regionSize) throws Exception { - try (MemoryMappedBuffers memory = MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) { - final int numBuffers = writeInts(memory, numInts); - memory.finishWrite(); + private void testWriteAndReadData(BoundedData bd) throws Exception { + final int numInts = 10_000_000; + final int numBuffers = writeInts(bd, numInts); + bd.finishWrite(); - readInts(memory.getFullBuffers(), numBuffers, numInts); - } + readInts(bd.createReader(), numBuffers, numInts); } @Test public void returnNullAfterEmpty() throws Exception { - try (MemoryMappedBuffers memory = MemoryMappedBuffers.create(createTempPath())) { - memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer()); - memory.finishWrite(); + try (BoundedData bd = createBoundedData()) { + bd.finishWrite(); - final BufferSlicer reader = memory.getFullBuffers(); - assertNotNull(reader.sliceNextBuffer()); + final BoundedData.Reader reader = bd.createReader(); // check that multiple calls now return empty buffers - assertNull(reader.sliceNextBuffer()); - assertNull(reader.sliceNextBuffer()); - assertNull(reader.sliceNextBuffer()); + assertNull(reader.nextBuffer()); + assertNull(reader.nextBuffer()); + assertNull(reader.nextBuffer()); } } @Test public void testDeleteFileOnClose() throws Exception { final Path path = createTempPath(); - final MemoryMappedBuffers mmb = MemoryMappedBuffers.create(path); + final BoundedData bd = createBoundedData(path); assertTrue(Files.exists(path)); - mmb.close(); + bd.close(); assertFalse(Files.exists(path)); } @Test public void testGetSizeSingleRegion() throws Exception { - testGetSize(Integer.MAX_VALUE); + try (BoundedData bd = createBoundedData()) { + testGetSize(bd); + } } @Test public void testGetSizeMultipleRegions() throws Exception { - testGetSize(100_000); + try (BoundedData bd = createBoundedDataWithRegion(100_000)) { + testGetSize(bd); + } } - private static void testGetSize(int regionSize) throws Exception { + private static void testGetSize(BoundedData bd) throws Exception { final int bufferSize1 = 60_787; final int bufferSize2 = 76_687; - final int expectedSize1 = bufferSize1 + BufferToByteBuffer.HEADER_LENGTH; - final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * BufferToByteBuffer.HEADER_LENGTH; - - try (MemoryMappedBuffers memory = MemoryMappedBuffers.createWithRegionSize(createTempPath(), regionSize)) { + final int expectedSize1 = bufferSize1 + BufferReaderWriterUtil.HEADER_LENGTH; + final int expectedSizeFinal = bufferSize1 + bufferSize2 + 2 * BufferReaderWriterUtil.HEADER_LENGTH; - memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1)); - assertEquals(expectedSize1, memory.getSize()); + bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize1)); + assertEquals(expectedSize1, bd.getSize()); - memory.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2)); - assertEquals(expectedSizeFinal, memory.getSize()); + bd.writeBuffer(BufferBuilderTestUtils.buildSomeBuffer(bufferSize2)); + assertEquals(expectedSizeFinal, bd.getSize()); - memory.finishWrite(); - assertEquals(expectedSizeFinal, memory.getSize()); - } + bd.finishWrite(); + assertEquals(expectedSizeFinal, bd.getSize()); } // ------------------------------------------------------------------------ // utils // ------------------------------------------------------------------------ - private static int writeInts(MemoryMappedBuffers memory, int numInts) throws IOException { - final int bufferSize = 1024 * 1024; // 1 MiByte - final int numIntsInBuffer = bufferSize / 4; + private static int writeInts(BoundedData bd, int numInts) throws IOException { + final int numIntsInBuffer = BUFFER_SIZE / 4; int numBuffers = 0; for (int nextValue = 0; nextValue < numInts; nextValue += numIntsInBuffer) { - Buffer buffer = BufferBuilderTestUtils.buildBufferWithAscendingInts(bufferSize, numIntsInBuffer, nextValue); - memory.writeBuffer(buffer); + Buffer buffer = BufferBuilderTestUtils.buildBufferWithAscendingInts(BUFFER_SIZE, numIntsInBuffer, nextValue); + bd.writeBuffer(buffer); numBuffers++; } return numBuffers; } - private static void readInts(MemoryMappedBuffers.BufferSlicer memory, int numBuffersExpected, int numInts) throws IOException { + private static void readInts(BoundedData.Reader reader, int numBuffersExpected, int numInts) throws IOException { Buffer b; int nextValue = 0; int numBuffers = 0; - while ((b = memory.sliceNextBuffer()) != null) { + while ((b = reader.nextBuffer()) != null) { final int numIntsInBuffer = b.getSize() / 4; BufferBuilderTestUtils.validateBufferWithAscendingInts(b, numIntsInBuffer, nextValue); nextValue += numIntsInBuffer; numBuffers++; + + b.recycleBuffer(); } assertEquals(numBuffersExpected, numBuffers); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java new file mode 100644 index 0000000..606dedf --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.StandardOpenOption; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link BufferReaderWriterUtil}. + */ +public class BufferReaderWriterUtilTest { + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + // ------------------------------------------------------------------------ + // Byte Buffer + // ------------------------------------------------------------------------ + + @Test + public void writeReadByteBuffer() { + final ByteBuffer memory = ByteBuffer.allocateDirect(1200); + final Buffer buffer = createTestBuffer(); + + BufferReaderWriterUtil.writeBuffer(buffer, memory); + final int pos = memory.position(); + memory.flip(); + Buffer result = BufferReaderWriterUtil.sliceNextBuffer(memory); + + assertEquals(pos, memory.position()); + validateTestBuffer(result); + } + + @Test + public void writeByteBufferNotEnoughSpace() { + final ByteBuffer memory = ByteBuffer.allocateDirect(10); + final Buffer buffer = createTestBuffer(); + + final boolean written = BufferReaderWriterUtil.writeBuffer(buffer, memory); + + assertFalse(written); + assertEquals(0, memory.position()); + assertEquals(memory.capacity(), memory.limit()); + } + + @Test + public void readFromEmptyByteBuffer() { + final ByteBuffer memory = ByteBuffer.allocateDirect(100); + memory.position(memory.limit()); + + final Buffer result = BufferReaderWriterUtil.sliceNextBuffer(memory); + + assertNull(result); + } + + @Test + public void testReadFromByteBufferNotEnoughData() { + final ByteBuffer memory = ByteBuffer.allocateDirect(1200); + final Buffer buffer = createTestBuffer(); + BufferReaderWriterUtil.writeBuffer(buffer, memory); + + memory.flip().limit(memory.limit() - 1); + ByteBuffer tooSmall = memory.slice(); + + try { + BufferReaderWriterUtil.sliceNextBuffer(tooSmall); + fail(); + } + catch (Exception e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // File Channel + // ------------------------------------------------------------------------ + + @Test + public void writeReadFileChannel() throws Exception { + final FileChannel fc = tmpFileChannel(); + final Buffer buffer = createTestBuffer(); + final MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null); + + BufferReaderWriterUtil.writeToByteChannel(fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray()); + fc.position(0); + + Buffer result = BufferReaderWriterUtil.readFromByteChannel( + fc, BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, FreeingBufferRecycler.INSTANCE); + + validateTestBuffer(result); + } + + @Test + public void readPrematureEndOfFile1() throws Exception { + final FileChannel fc = tmpFileChannel(); + final Buffer buffer = createTestBuffer(); + final MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null); + + BufferReaderWriterUtil.writeToByteChannel(fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray()); + fc.truncate(fc.position() - 1); + fc.position(0); + + try { + BufferReaderWriterUtil.readFromByteChannel( + fc, BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, FreeingBufferRecycler.INSTANCE); + fail(); + } + catch (IOException e) { + // expected + } + } + + @Test + public void readPrematureEndOfFile2() throws Exception { + final FileChannel fc = tmpFileChannel(); + final Buffer buffer = createTestBuffer(); + final MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null); + + BufferReaderWriterUtil.writeToByteChannel(fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray()); + fc.truncate(2); // less than a header size + fc.position(0); + + try { + BufferReaderWriterUtil.readFromByteChannel( + fc, BufferReaderWriterUtil.allocatedHeaderBuffer(), readBuffer, FreeingBufferRecycler.INSTANCE); + fail(); + } + catch (IOException e) { + // expected + } + } + + // ------------------------------------------------------------------------ + // Mixed + // ------------------------------------------------------------------------ + + @Test + public void writeFileReadMemoryBuffer() throws Exception { + final FileChannel fc = tmpFileChannel(); + final Buffer buffer = createTestBuffer(); + BufferReaderWriterUtil.writeToByteChannel(fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray()); + + final ByteBuffer bb = fc.map(MapMode.READ_ONLY, 0, fc.position()).order(ByteOrder.nativeOrder()); + BufferReaderWriterUtil.configureByteBuffer(bb); + fc.close(); + + Buffer result = BufferReaderWriterUtil.sliceNextBuffer(bb); + + validateTestBuffer(result); + } + + // ------------------------------------------------------------------------ + // Util + // ------------------------------------------------------------------------ + + private static FileChannel tmpFileChannel() throws IOException { + return FileChannel.open(TMP_FOLDER.newFile().toPath(), + StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); + } + + private static Buffer createTestBuffer() { + return BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, 200, 0); + } + + private static void validateTestBuffer(Buffer buffer) { + BufferBuilderTestUtils.validateBufferWithAscendingInts(buffer, 200, 0); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java deleted file mode 100644 index 55fa496..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferToByteBufferTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.partition; - -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Reader; -import org.apache.flink.runtime.io.network.partition.BufferToByteBuffer.Writer; - -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - -/** - * Tests for the {@link BufferToByteBuffer}. - */ -public class BufferToByteBufferTest { - - @Test - public void testCompleteIsSameBufferAsOriginal() { - final ByteBuffer bb = ByteBuffer.allocateDirect(128); - final BufferToByteBuffer.Writer writer = new BufferToByteBuffer.Writer(bb); - - final ByteBuffer result = writer.complete(); - - assertSame(bb, result); - } - - @Test - public void testWriteReadMatchesCapacity() { - final ByteBuffer bb = ByteBuffer.allocateDirect(1200); - testWriteAndReadMultipleBuffers(bb, 100); - } - - @Test - public void testWriteReadWithLeftoverCapacity() { - final ByteBuffer bb = ByteBuffer.allocateDirect(1177); - testWriteAndReadMultipleBuffers(bb, 100); - } - - private void testWriteAndReadMultipleBuffers(ByteBuffer buffer, int numIntsPerBuffer) { - final Writer writer = new Writer(buffer); - - int numBuffers = 0; - while (writer.writeBuffer(BufferBuilderTestUtils.buildBufferWithAscendingInts(1024, numIntsPerBuffer, 0))) { - numBuffers++; - } - - final ByteBuffer bb = writer.complete().slice(); - - final Reader reader = new Reader(bb); - Buffer buf; - while ((buf = reader.sliceNextBuffer()) != null) { - BufferBuilderTestUtils.validateBufferWithAscendingInts(buf, numIntsPerBuffer, 0); - numBuffers--; - } - - assertEquals(0, numBuffers); - } - -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java new file mode 100644 index 0000000..dd1c22b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.junit.AssumptionViolatedException; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. + */ +public class FileChannelBoundedDataTest extends BoundedDataTestBase { + + @Override + protected BoundedData createBoundedData(Path tempFilePath) throws IOException { + return FileChannelBoundedData.create(tempFilePath, BUFFER_SIZE); + } + + @Override + protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { + throw new AssumptionViolatedException("FileChannelBoundedData is not region based"); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java new file mode 100644 index 0000000..0fe1f42 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedDataTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. + */ +public class FileChannelMemoryMappedBoundedDataTest extends BoundedDataTestBase { + + @Override + protected BoundedData createBoundedData(Path tempFilePath) throws IOException { + return FileChannelMemoryMappedBoundedData.create(tempFilePath); + } + + @Override + protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { + return FileChannelMemoryMappedBoundedData.createWithRegionSize(tempFilePath, regionSize); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java new file mode 100644 index 0000000..18c5823 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedDataTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import java.io.IOException; +import java.nio.file.Path; + +/** + * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. + */ +public class MemoryMappedBoundedDataTest extends BoundedDataTestBase { + + @Override + protected BoundedData createBoundedData(Path tempFilePath) throws IOException { + return MemoryMappedBoundedData.create(tempFilePath); + } + + @Override + protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { + return MemoryMappedBoundedData.createWithRegionSize(tempFilePath, regionSize); + } +}
