This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9175bc924ff18f99dfc5fdfe82cc00da38e14a37 Author: Weijie Guo <[email protected]> AuthorDate: Thu Jan 5 15:01:31 2023 +0800 [FLINK-30332][network] Introduce InternalRegionWriteReadUtils to read and write region. --- .../partition/hybrid/HsFileDataIndexImpl.java | 38 +++++++- .../hybrid/InternalRegionWriteReadUtils.java | 102 +++++++++++++++++++++ .../hybrid/InternalRegionWriteReadUtilsTest.java | 86 +++++++++++++++++ 3 files changed, 225 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java index ad919776e5e..de0081b559d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java @@ -169,7 +169,14 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { * <p>Note: This index may not always maintain the longest possible regions. E.g., 2-1, 2-2, 2-3 * are in two separate regions. */ - private static class InternalRegion { + static class InternalRegion { + /** + * {@link InternalRegion} is consists of header and payload. (firstBufferIndex, + * firstBufferOffset, numBuffer) are immutable header part that have fixed size. The array + * of released is variable payload. This field represents the size of header. + */ + public static final int HEADER_SIZE = Integer.BYTES + Long.BYTES + Integer.BYTES; + private final int firstBufferIndex; private final long firstBufferOffset; private final int numBuffers; @@ -183,6 +190,14 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { Arrays.fill(released, false); } + InternalRegion( + int firstBufferIndex, long firstBufferOffset, int numBuffers, boolean[] released) { + this.firstBufferIndex = firstBufferIndex; + this.firstBufferOffset = firstBufferOffset; + this.numBuffers = numBuffers; + this.released = released; + } + private boolean containBuffer(int bufferIndex) { return bufferIndex >= firstBufferIndex && bufferIndex < firstBufferIndex + numBuffers; } @@ -203,5 +218,26 @@ public class HsFileDataIndexImpl implements HsFileDataIndex { private void markBufferReleased(int bufferIndex) { released[bufferIndex - firstBufferIndex] = true; } + + /** Get the total size in bytes of this region, including header and payload. */ + int getSize() { + return HEADER_SIZE + numBuffers; + } + + int getFirstBufferIndex() { + return firstBufferIndex; + } + + long getFirstBufferOffset() { + return firstBufferOffset; + } + + int getNumBuffers() { + return numBuffers; + } + + boolean[] getReleased() { + return released; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java new file mode 100644 index 00000000000..08726fb8afb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + +/** Utils for read and write {@link InternalRegion}. */ +public class InternalRegionWriteReadUtils { + + /** + * Allocate a buffer with specific size and configure it to native order. + * + * @param bufferSize the size of buffer to allocate. + * @return a native order buffer with expected size. + */ + public static ByteBuffer allocateAndConfigureBuffer(int bufferSize) { + ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize); + buffer.order(ByteOrder.nativeOrder()); + return buffer; + } + + /** + * Write {@link InternalRegion} to {@link FileChannel}. + * + * @param channel the file's channel to write. + * @param headerBuffer the buffer to write {@link InternalRegion}'s header. + * @param region the region to be written to channel. + */ + public static void writeRegionToFile( + FileChannel channel, ByteBuffer headerBuffer, InternalRegion region) + throws IOException { + // write header buffer. + headerBuffer.clear(); + headerBuffer.putInt(region.getFirstBufferIndex()); + headerBuffer.putInt(region.getNumBuffers()); + headerBuffer.putLong(region.getFirstBufferOffset()); + headerBuffer.flip(); + + // write payload buffer. + ByteBuffer payloadBuffer = allocateAndConfigureBuffer(region.getNumBuffers()); + boolean[] released = region.getReleased(); + for (boolean b : released) { + payloadBuffer.put(b ? (byte) 1 : (byte) 0); + } + payloadBuffer.flip(); + + BufferReaderWriterUtil.writeBuffers( + channel, + headerBuffer.capacity() + payloadBuffer.capacity(), + headerBuffer, + payloadBuffer); + } + + /** + * Read {@link InternalRegion} from {@link FileChannel}. + * + * @param channel the channel to read. + * @param headerBuffer the buffer to read {@link InternalRegion}'s header. + * @param position position to start read. + * @return the {@link InternalRegion} that read from this channel. + */ + public static InternalRegion readRegionFromFile( + FileChannel channel, ByteBuffer headerBuffer, long position) throws IOException { + headerBuffer.clear(); + BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, position); + headerBuffer.flip(); + int firstBufferIndex = headerBuffer.getInt(); + int numBuffers = headerBuffer.getInt(); + long firstBufferOffset = headerBuffer.getLong(); + ByteBuffer payloadBuffer = allocateAndConfigureBuffer(numBuffers); + BufferReaderWriterUtil.readByteBufferFully( + channel, payloadBuffer, position + InternalRegion.HEADER_SIZE); + boolean[] released = new boolean[numBuffers]; + payloadBuffer.flip(); + for (int i = 0; i < numBuffers; i++) { + released[i] = payloadBuffer.get() != 0; + } + return new InternalRegion(firstBufferIndex, firstBufferOffset, numBuffers, released); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java new file mode 100644 index 00000000000..e65a8a132d9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.UUID; + +import static org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals; +import static org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link InternalRegionWriteReadUtils}. */ +class InternalRegionWriteReadUtilsTest { + @Test + void testAllocateAndConfigureBuffer() { + final int bufferSize = 16; + ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize); + assertThat(buffer.capacity()).isEqualTo(16); + assertThat(buffer.limit()).isEqualTo(16); + assertThat(buffer.position()).isZero(); + assertThat(buffer.isDirect()).isTrue(); + assertThat(buffer.order()).isEqualTo(ByteOrder.nativeOrder()); + } + + @Test + void testReadPrematureEndOfFile(@TempDir Path tmpPath) throws Exception { + FileChannel channel = tmpFileChannel(tmpPath); + ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + InternalRegionWriteReadUtils.writeRegionToFile( + channel, buffer, createSingleUnreleasedRegion(0, 0L, 1)); + channel.truncate(channel.position() - 1); + buffer.flip(); + assertThatThrownBy( + () -> InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L)) + .isInstanceOf(IOException.class); + } + + @Test + void testWriteAndReadRegion(@TempDir Path tmpPath) throws Exception { + FileChannel channel = tmpFileChannel(tmpPath); + ByteBuffer buffer = InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE); + InternalRegion region = createSingleUnreleasedRegion(10, 100L, 1); + InternalRegionWriteReadUtils.writeRegionToFile(channel, buffer, region); + buffer.flip(); + InternalRegion readRegion = + InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L); + assertRegionEquals(readRegion, region); + } + + private static FileChannel tmpFileChannel(Path tempPath) throws IOException { + return FileChannel.open( + Files.createFile(tempPath.resolve(UUID.randomUUID().toString())), + StandardOpenOption.CREATE, + StandardOpenOption.READ, + StandardOpenOption.WRITE); + } +}
