reswqa commented on code in PR #22982:
URL: https://github.com/apache/flink/pull/22982#discussion_r1265154015


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataIndexImpl.java:
##########
@@ -223,32 +239,49 @@ private HsFileDataIndex.ReadableRegion toReadableRegion(
                 }
                 ++nReadable;
             }
-            return new ReadableRegion(nSkip, nReadable, firstBufferOffset);
+            return new ReadableRegion(nSkip, nReadable, regionFileOffset);
         }
 
         private void markBufferReleased(int bufferIndex) {
             released[bufferIndex - firstBufferIndex] = true;
         }
 
-        /** Get the total size in bytes of this region, including header and 
payload. */
-        int getSize() {
-            return HEADER_SIZE + numBuffers;
+        public boolean[] getReleased() {
+            return released;
         }
+    }
 
-        int getFirstBufferIndex() {
-            return firstBufferIndex;
-        }
+    /**
+     * The implementation of {@link FileDataIndexRegionHelper} to writing a 
region to the file or
+     * reading a region from the file.
+     *
+     * <p>Note that this type of region's length may be variable because it 
contains an array to
+     * indicate each buffer's release state.
+     */
+    public static class HsFileDataIndexRegionHelperImpl

Review Comment:
   ```suggestion
       public static class HsFileDataIndexRegionHelper
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/file/ProducerMergedPartitionFileIndex.java:
##########
@@ -205,6 +212,38 @@ long getFileOffset() {
         }
     }
 
+    /**
+     * The implementation of {@link FileDataIndexRegionHelper} to writing a 
region to the file or
+     * reading a region from the file.
+     *
+     * <p>Note that this type of region's length is fixed.
+     */
+    static class ProducerMergedPartitionFileDataIndexRegionHelperImpl

Review Comment:
   ```suggestion
       static class ProducerMergedPartitionFileDataIndexRegionHelper
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexSpilledRegionManagerImpl.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.index;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+
+/**
+ * Default implementation of {@link FileDataIndexSpilledRegionManager}. This 
manager will handle and
+ * spill regions in the following way:
+ *
+ * <ul>
+ *   <li>All regions will be written to the same file, namely index file.
+ *   <li>Multiple regions belonging to the same subpartition form a region 
group.
+ *   <li>The regions in the same region group have no special relationship, 
but are only related to
+ *       the order in which they are spilled.
+ *   <li>Each region group is independent. Even if the previous region group 
is not full, the next
+ *       region group can still be allocated.
+ *   <li>If a region has been written to the index file already, spill it 
again will overwrite the
+ *       previous region.
+ *   <li>The very large region will monopolize a single region group.
+ * </ul>
+ *
+ * <p>The relationships between index file and region group are shown below.
+ *
+ * <pre>
+ *
+ *         - - - - - - - - - Index File - - — - - - - - - - - -
+ *        |                                                     |
+ *        | - - — -RegionGroup1 - -   - - RegionGroup2- - - -   |
+ *        ||SP1 R1||SP1 R2| Free | |SP2 R3| SP2 R1| SP2 R2 |  |
+ *        | - - - - - - - - - - - -   - - - - - - - - - - - -   |
+ *        |                                                     |
+ *        | - - - - - - - -RegionGroup3 - - - - -               |
+ *        ||              Big Region             |              |
+ *        | - - - - - - - - - - - - - - - - - - -               |
+ *         - - - - - - - - - - - - - - - - - - - - - -- - - - -
+ * </pre>
+ */
+public class FileDataIndexSpilledRegionManagerImpl<T extends 
FileDataIndexRegionHelper.Region>

Review Comment:
   This class should be moved and renamed from 
`HsFileDataIndexSpilledRegionManagerImpl` instead of create a new file. 
Otherwise, it's hard to figure out the diff.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/index/FileDataIndexCacheTest.java:
##########
@@ -34,41 +35,41 @@
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils.createAllUnreleasedRegions;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link HsFileDataIndexCache}. */
-class HsFileDataIndexCacheTest {
-    private HsFileDataIndexCache indexCache;
+/** Tests for {@link FileDataIndexCache}. */
+class FileDataIndexCacheTest {
+    private FileDataIndexCache<HsFileDataIndexImpl.InternalRegion> indexCache;
 
-    private TestingFileDataIndexSpilledRegionManager spilledRegionManager;
+    private 
TestingFileDataIndexSpilledRegionManager<HsFileDataIndexImpl.InternalRegion>
+            spilledRegionManager;
 
     private final int numSubpartitions = 1;
 
-    private static final int SPILLED_INDEX_SEGMENT_SIZE = 256;
-
     private int numRetainedIndexEntry = 10;
 
     @BeforeEach
     void before(@TempDir Path tmpPath) throws Exception {
         Path indexFilePath = 
Files.createFile(tmpPath.resolve(UUID.randomUUID().toString()));
+        
TestingFileDataIndexSpilledRegionManager.Factory<HsFileDataIndexImpl.InternalRegion>
+                testingSpilledRegionManagerFactory =
+                        new 
TestingFileDataIndexSpilledRegionManager.Factory<>();
         indexCache =
-                new HsFileDataIndexCache(
+                new FileDataIndexCache<>(
                         numSubpartitions,
                         indexFilePath,
                         numRetainedIndexEntry,
-                        
TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE);
-        spilledRegionManager =
-                TestingFileDataIndexSpilledRegionManager.Factory.INSTANCE
-                        .getLastSpilledRegionManager();
+                        testingSpilledRegionManagerFactory);
+        spilledRegionManager = 
testingSpilledRegionManagerFactory.getLastSpilledRegionManager();
     }
 
     @Test
     void testPutAndGet() {
         indexCache.put(0, createAllUnreleasedRegions(0, 0L, 3, 1));
-        Optional<InternalRegion> regionOpt = indexCache.get(0, 0);
+        Optional<HsFileDataIndexImpl.InternalRegion> regionOpt = 
indexCache.get(0, 0);

Review Comment:
   It's a little confused that we test `HsFileDataIndexImpl. InternalRegion` 
for `FileDataIndexCacheTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to