xintongsong commented on code in PR #22982:
URL: https://github.com/apache/flink/pull/22982#discussion_r1263491441


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/region/FileRegionManager.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.region;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * {@link FileRegionManager} is responsible for writing a {@link Region} to 
the file or reading a
+ * {@link Region} from file.
+ */
+public interface FileRegionManager<T extends FileRegionManager.Region> {

Review Comment:
   I'd suggest the name `FileDataIndexRegionHelper`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java:
##########
@@ -223,32 +238,48 @@ private HsFileDataIndex.ReadableRegion toReadableRegion(
                 }
                 ++nReadable;
             }
-            return new ReadableRegion(nSkip, nReadable, firstBufferOffset);
+            return new ReadableRegion(nSkip, nReadable, regionFileOffset);
         }
 
         private void markBufferReleased(int bufferIndex) {
             released[bufferIndex - firstBufferIndex] = true;
         }
 
-        /** Get the total size in bytes of this region, including header and 
payload. */
-        int getSize() {
-            return HEADER_SIZE + numBuffers;
+        public boolean[] getReleased() {
+            return released;
         }
+    }
 
-        int getFirstBufferIndex() {
-            return firstBufferIndex;
-        }
+    /**
+     * The implementation of {@link FileRegionManager} to writing a region to 
the file or reading a
+     * region from the file.
+     *
+     * <p>Note that this type of region's length may be variable because it 
contains an array to
+     * indicate each buffer's release state.
+     */
+    public static class HsFileRegionManagerImpl<T extends 
FileRegionManager.Region>

Review Comment:
   Why is `HsFileRegionManagerImpl` generic?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/region/FileRegionManager.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.region;

Review Comment:
   I'd suggest the package name `xxx.hybrid.index`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -205,6 +212,38 @@ long getFileOffset() {
         }
     }
 
+    /**
+     * The implementation of {@link FileRegionManager} to writing a region to 
the file or reading a
+     * region from the file.
+     *
+     * <p>Note that this type of region's length is fixed.
+     */
+    static class ProducerMergedPartitionFileRegionManagerImpl<T extends 
FileRegionManager.Region>
+            implements FileRegionManager<T> {
+
+        /** Reusable buffer used to read and write the immutable part of 
region. */
+        private final ByteBuffer regionBuffer =
+                allocateAndConfigureBuffer(FixedSizeRegion.REGION_SIZE);
+
+        static final FileRegionManager<FixedSizeRegion> INSTANCE =
+                new ProducerMergedPartitionFileRegionManagerImpl<>();
+
+        private ProducerMergedPartitionFileRegionManagerImpl() {}
+
+        @Override
+        public void writeRegionToFile(FileChannel channel, T region) throws 
IOException {
+            FileRegionWriteReadUtils.writeFixedSizeRegionToFile(channel, 
regionBuffer, region);
+        }
+
+        @Override
+        public T readRegionFromFile(FileChannel channel, long fileOffset) 
throws IOException {
+            FileRegionManager.Region region =

Review Comment:
   No need for local variable.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -205,6 +212,38 @@ long getFileOffset() {
         }
     }
 
+    /**
+     * The implementation of {@link FileRegionManager} to writing a region to 
the file or reading a
+     * region from the file.
+     *
+     * <p>Note that this type of region's length is fixed.
+     */
+    static class ProducerMergedPartitionFileRegionManagerImpl<T extends 
FileRegionManager.Region>

Review Comment:
   Same here. This needs not to be generic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to