This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9175bc924ff18f99dfc5fdfe82cc00da38e14a37
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jan 5 15:01:31 2023 +0800

    [FLINK-30332][network] Introduce InternalRegionWriteReadUtils to read and 
write region.
---
 .../partition/hybrid/HsFileDataIndexImpl.java      |  38 +++++++-
 .../hybrid/InternalRegionWriteReadUtils.java       | 102 +++++++++++++++++++++
 .../hybrid/InternalRegionWriteReadUtilsTest.java   |  86 +++++++++++++++++
 3 files changed, 225 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
index ad919776e5e..de0081b559d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java
@@ -169,7 +169,14 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
      * <p>Note: This index may not always maintain the longest possible 
regions. E.g., 2-1, 2-2, 2-3
      * are in two separate regions.
      */
-    private static class InternalRegion {
+    static class InternalRegion {
+        /**
+         * {@link InternalRegion} is consists of header and payload. 
(firstBufferIndex,
+         * firstBufferOffset, numBuffer) are immutable header part that have 
fixed size. The array
+         * of released is variable payload. This field represents the size of 
header.
+         */
+        public static final int HEADER_SIZE = Integer.BYTES + Long.BYTES + 
Integer.BYTES;
+
         private final int firstBufferIndex;
         private final long firstBufferOffset;
         private final int numBuffers;
@@ -183,6 +190,14 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
             Arrays.fill(released, false);
         }
 
+        InternalRegion(
+                int firstBufferIndex, long firstBufferOffset, int numBuffers, 
boolean[] released) {
+            this.firstBufferIndex = firstBufferIndex;
+            this.firstBufferOffset = firstBufferOffset;
+            this.numBuffers = numBuffers;
+            this.released = released;
+        }
+
         private boolean containBuffer(int bufferIndex) {
             return bufferIndex >= firstBufferIndex && bufferIndex < 
firstBufferIndex + numBuffers;
         }
@@ -203,5 +218,26 @@ public class HsFileDataIndexImpl implements 
HsFileDataIndex {
         private void markBufferReleased(int bufferIndex) {
             released[bufferIndex - firstBufferIndex] = true;
         }
+
+        /** Get the total size in bytes of this region, including header and 
payload. */
+        int getSize() {
+            return HEADER_SIZE + numBuffers;
+        }
+
+        int getFirstBufferIndex() {
+            return firstBufferIndex;
+        }
+
+        long getFirstBufferOffset() {
+            return firstBufferOffset;
+        }
+
+        int getNumBuffers() {
+            return numBuffers;
+        }
+
+        boolean[] getReleased() {
+            return released;
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
new file mode 100644
index 00000000000..08726fb8afb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
+/** Utils for read and write {@link InternalRegion}. */
+public class InternalRegionWriteReadUtils {
+
+    /**
+     * Allocate a buffer with specific size and configure it to native order.
+     *
+     * @param bufferSize the size of buffer to allocate.
+     * @return a native order buffer with expected size.
+     */
+    public static ByteBuffer allocateAndConfigureBuffer(int bufferSize) {
+        ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);
+        buffer.order(ByteOrder.nativeOrder());
+        return buffer;
+    }
+
+    /**
+     * Write {@link InternalRegion} to {@link FileChannel}.
+     *
+     * @param channel the file's channel to write.
+     * @param headerBuffer the buffer to write {@link InternalRegion}'s header.
+     * @param region the region to be written to channel.
+     */
+    public static void writeRegionToFile(
+            FileChannel channel, ByteBuffer headerBuffer, InternalRegion 
region)
+            throws IOException {
+        // write header buffer.
+        headerBuffer.clear();
+        headerBuffer.putInt(region.getFirstBufferIndex());
+        headerBuffer.putInt(region.getNumBuffers());
+        headerBuffer.putLong(region.getFirstBufferOffset());
+        headerBuffer.flip();
+
+        // write payload buffer.
+        ByteBuffer payloadBuffer = 
allocateAndConfigureBuffer(region.getNumBuffers());
+        boolean[] released = region.getReleased();
+        for (boolean b : released) {
+            payloadBuffer.put(b ? (byte) 1 : (byte) 0);
+        }
+        payloadBuffer.flip();
+
+        BufferReaderWriterUtil.writeBuffers(
+                channel,
+                headerBuffer.capacity() + payloadBuffer.capacity(),
+                headerBuffer,
+                payloadBuffer);
+    }
+
+    /**
+     * Read {@link InternalRegion} from {@link FileChannel}.
+     *
+     * @param channel the channel to read.
+     * @param headerBuffer the buffer to read {@link InternalRegion}'s header.
+     * @param position position to start read.
+     * @return the {@link InternalRegion} that read from this channel.
+     */
+    public static InternalRegion readRegionFromFile(
+            FileChannel channel, ByteBuffer headerBuffer, long position) 
throws IOException {
+        headerBuffer.clear();
+        BufferReaderWriterUtil.readByteBufferFully(channel, headerBuffer, 
position);
+        headerBuffer.flip();
+        int firstBufferIndex = headerBuffer.getInt();
+        int numBuffers = headerBuffer.getInt();
+        long firstBufferOffset = headerBuffer.getLong();
+        ByteBuffer payloadBuffer = allocateAndConfigureBuffer(numBuffers);
+        BufferReaderWriterUtil.readByteBufferFully(
+                channel, payloadBuffer, position + InternalRegion.HEADER_SIZE);
+        boolean[] released = new boolean[numBuffers];
+        payloadBuffer.flip();
+        for (int i = 0; i < numBuffers; i++) {
+            released[i] = payloadBuffer.get() != 0;
+        }
+        return new InternalRegion(firstBufferIndex, firstBufferOffset, 
numBuffers, released);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
new file mode 100644
index 00000000000..e65a8a132d9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/InternalRegionWriteReadUtilsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.InternalRegion.HEADER_SIZE;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.assertRegionEquals;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createSingleUnreleasedRegion;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link InternalRegionWriteReadUtils}. */
+class InternalRegionWriteReadUtilsTest {
+    @Test
+    void testAllocateAndConfigureBuffer() {
+        final int bufferSize = 16;
+        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(bufferSize);
+        assertThat(buffer.capacity()).isEqualTo(16);
+        assertThat(buffer.limit()).isEqualTo(16);
+        assertThat(buffer.position()).isZero();
+        assertThat(buffer.isDirect()).isTrue();
+        assertThat(buffer.order()).isEqualTo(ByteOrder.nativeOrder());
+    }
+
+    @Test
+    void testReadPrematureEndOfFile(@TempDir Path tmpPath) throws Exception {
+        FileChannel channel = tmpFileChannel(tmpPath);
+        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        InternalRegionWriteReadUtils.writeRegionToFile(
+                channel, buffer, createSingleUnreleasedRegion(0, 0L, 1));
+        channel.truncate(channel.position() - 1);
+        buffer.flip();
+        assertThatThrownBy(
+                        () -> 
InternalRegionWriteReadUtils.readRegionFromFile(channel, buffer, 0L))
+                .isInstanceOf(IOException.class);
+    }
+
+    @Test
+    void testWriteAndReadRegion(@TempDir Path tmpPath) throws Exception {
+        FileChannel channel = tmpFileChannel(tmpPath);
+        ByteBuffer buffer = 
InternalRegionWriteReadUtils.allocateAndConfigureBuffer(HEADER_SIZE);
+        InternalRegion region = createSingleUnreleasedRegion(10, 100L, 1);
+        InternalRegionWriteReadUtils.writeRegionToFile(channel, buffer, 
region);
+        buffer.flip();
+        InternalRegion readRegion =
+                InternalRegionWriteReadUtils.readRegionFromFile(channel, 
buffer, 0L);
+        assertRegionEquals(readRegion, region);
+    }
+
+    private static FileChannel tmpFileChannel(Path tempPath) throws 
IOException {
+        return FileChannel.open(
+                
Files.createFile(tempPath.resolve(UUID.randomUUID().toString())),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.READ,
+                StandardOpenOption.WRITE);
+    }
+}

Reply via email to