This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e858969f27d [FLINK-36959][runtime] Fix the logic for computing 
readable buffer offsets and sizes when including empty buffers.
e858969f27d is described below

commit e858969f27d014d5b025ba2c8ca920fb79161efe
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Tue Dec 24 23:57:10 2024 +0800

    [FLINK-36959][runtime] Fix the logic for computing readable buffer offsets 
and sizes when including empty buffers.
---
 .../network/partition/PartitionedFileReader.java   |   6 +-
 .../network/partition/PartitionedFileWriter.java   |  50 +++++++++-
 .../partition/SortMergeResultPartition.java        |   3 +-
 .../io/network/partition/PartitionTestUtils.java   |   7 +-
 .../partition/PartitionedFileWriteReadTest.java    | 105 +++++++++++++++++----
 5 files changed, 146 insertions(+), 25 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
index 6d5c1d9e58a..582e1637c44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
@@ -201,15 +201,13 @@ class PartitionedFileReader {
         long endPartitionOffset = indexEntryBuf.getLong();
         long endPartitionSize = indexEntryBuf.getLong();
 
-        if (startPartitionOffset != endPartitionOffset) {
+        if (startPartitionOffset != endPartitionOffset || startPartitionSize 
!= endPartitionSize) {
             offsetAndSizesToRead.add(
                     Tuple2.of(
                             startPartitionOffset,
                             endPartitionOffset + endPartitionSize - 
startPartitionOffset));
         } else if (startPartitionSize != 0) {
-            checkArgument(
-                    startPartitionSize == endPartitionSize,
-                    "Offsets need to be either contiguous or all the same.");
+            // this branch is for broadcast subpartitions
             for (int i = startSubpartition; i <= endSubpartition; i++) {
                 offsetAndSizesToRead.add(Tuple2.of(startPartitionOffset, 
startPartitionSize));
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
index ba3582aa7f9..bd46b18e8cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -105,14 +106,24 @@ public class PartitionedFileWriter implements 
AutoCloseable {
     /** Whether this file writer is closed or not. */
     private boolean isClosed;
 
-    public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, 
String basePath)
+    private final int[] writeOrder;
+
+    /** Total number of bytes written before the current region. */
+    private long preRegionTotalBytesWritten;
+
+    public PartitionedFileWriter(
+            int numSubpartitions, int maxIndexBufferSize, String basePath, 
int[] writeOrder)
             throws IOException {
-        this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, 
basePath);
+        this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, 
basePath, writeOrder);
     }
 
     @VisibleForTesting
     PartitionedFileWriter(
-            int numSubpartitions, int minIndexBufferSize, int 
maxIndexBufferSize, String basePath)
+            int numSubpartitions,
+            int minIndexBufferSize,
+            int maxIndexBufferSize,
+            String basePath,
+            int[] writeOrder)
             throws IOException {
         checkArgument(numSubpartitions > 0, "Illegal number of 
subpartitions.");
         checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache 
size.");
@@ -124,6 +135,7 @@ public class PartitionedFileWriter implements AutoCloseable 
{
         this.subpartitionBytes = new long[numSubpartitions];
         this.dataFilePath = new File(basePath + 
PartitionedFile.DATA_FILE_SUFFIX).toPath();
         this.indexFilePath = new File(basePath + 
PartitionedFile.INDEX_FILE_SUFFIX).toPath();
+        this.writeOrder = checkNotNull(writeOrder);
 
         this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize);
         BufferReaderWriterUtil.configureByteBuffer(indexBuffer);
@@ -195,6 +207,7 @@ public class PartitionedFileWriter implements AutoCloseable 
{
 
     private void writeRegionIndex() throws IOException {
         if (Arrays.stream(subpartitionBytes).sum() > 0) {
+            updateEmptySubpartitionOffsets();
             for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
                 writeIndexEntry(subpartitionOffsets[subpartition], 
subpartitionBytes[subpartition]);
             }
@@ -202,6 +215,37 @@ public class PartitionedFileWriter implements 
AutoCloseable {
             currentSubpartition = -1;
             ++numRegions;
             Arrays.fill(subpartitionBytes, 0);
+            preRegionTotalBytesWritten = totalBytesWritten;
+        }
+    }
+
+    /**
+     * Updates the offsets of subpartitions, ensuring that they are contiguous.
+     *
+     * <p>This method is necessary because empty subpartitions do not trigger 
an update to their
+     * offsets during the usual process. As such, we need to ensure here that 
every subpartition,
+     * including empty ones, has its offset updated to maintain continuity. 
This process involves
+     * adjusting each subpartition's offset based on the sum of previous 
subpartitions' bytes,
+     * ensuring seamless data handling and storage alignment.
+     */
+    private void updateEmptySubpartitionOffsets() {
+        for (int i = 0; i < writeOrder.length; i++) {
+            int currentSubPartition = writeOrder[i];
+
+            if (subpartitionBytes[currentSubPartition] == 0) {
+                if (i == 0) {
+                    // For the first subpartition, set its offset to the 
current pre-region total
+                    // bytes written if it's empty.
+                    subpartitionOffsets[currentSubPartition] = 
preRegionTotalBytesWritten;
+                } else {
+                    // For non-first subpartitions, update the offset of an 
empty subpartition to be
+                    // contiguous with the previous subpartition.
+                    int preSubPartition = writeOrder[i - 1];
+                    subpartitionOffsets[currentSubPartition] =
+                            subpartitionOffsets[preSubPartition]
+                                    + subpartitionBytes[preSubPartition];
+                }
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 6255aab17b7..99b6856ed04 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -180,7 +180,8 @@ public class SortMergeResultPartition extends 
ResultPartition {
             try {
                 // allocate at most 4M heap memory for caching of index entries
                 fileWriter =
-                        new PartitionedFileWriter(numSubpartitions, 4194304, 
resultFileBasePath);
+                        new PartitionedFileWriter(
+                                numSubpartitions, 4194304, resultFileBasePath, 
subpartitionOrder);
             } catch (Throwable throwable) {
                 throw new IOException("Failed to create file writer.", 
throwable);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 994e7295e19..20547adc2c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -164,8 +164,13 @@ public enum PartitionTestUtils {
             }
         }
 
+        int[] writeOrder = new int[numSubpartitions];
+        for (int i = 0; i < numSubpartitions; i++) {
+            writeOrder[i] = i;
+        }
+
         PartitionedFileWriter fileWriter =
-                new PartitionedFileWriter(numSubpartitions, 1024, basePath);
+                new PartitionedFileWriter(numSubpartitions, 1024, basePath, 
writeOrder);
         fileWriter.startNewRegion(false);
         fileWriter.writeBuffers(buffers);
         return fileWriter.finish();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index 7177ccc1224..587b27727d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -85,7 +85,7 @@ class PartitionedFileWriteReadTest {
                         numRegions,
                         buffersWritten,
                         regionStat,
-                        createPartitionedFileWriter(numSubpartitions),
+                        createPartitionedFileWriter(numSubpartitions, 
writeOrder),
                         subpartitionIndex -> subpartitionIndex,
                         random.nextBoolean(),
                         writeOrder);
@@ -151,7 +151,7 @@ class PartitionedFileWriteReadTest {
                         numRegions,
                         buffersWritten,
                         regionStat,
-                        createPartitionedFileWriter(numSubpartitions),
+                        createPartitionedFileWriter(numSubpartitions, 
writeOrder),
                         subpartitionIndex -> subpartitionIndex,
                         broadcastRegion,
                         writeOrder);
@@ -286,7 +286,7 @@ class PartitionedFileWriteReadTest {
                         numRegions,
                         buffersWritten,
                         regionStat,
-                        createPartitionedFileWriter(numSubpartitions),
+                        createPartitionedFileWriter(numSubpartitions, 
writeOrder),
                         subpartitionIndex -> subpartitionIndex / 2,
                         false,
                         writeOrder);
@@ -419,7 +419,10 @@ class PartitionedFileWriteReadTest {
     }
 
     private static Queue<MemorySegment> allocateBuffers(int bufferSize) {
-        int numBuffers = 2;
+        return allocateBuffers(bufferSize, 2);
+    }
+
+    private static Queue<MemorySegment> allocateBuffers(int bufferSize, int 
numBuffers) {
         Queue<MemorySegment> readBuffers = new LinkedList<>();
         while (numBuffers-- > 0) {
             
readBuffers.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
@@ -441,7 +444,8 @@ class PartitionedFileWriteReadTest {
             buffersRead[subpartition] = new ArrayList<>();
         }
 
-        PartitionedFileWriter fileWriter = 
createPartitionedFileWriter(numSubpartitions);
+        PartitionedFileWriter fileWriter =
+                createPartitionedFileWriter(numSubpartitions, new int[] {0, 1, 
2, 3, 4});
         for (int region = 0; region < numRegions; ++region) {
             fileWriter.startNewRegion(false);
             for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
@@ -481,6 +485,68 @@ class PartitionedFileWriteReadTest {
         IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
     }
 
+    @Test
+    void testWriteAndReadWithEmptySubpartitionForMultipleSubpartitions() 
throws Exception {
+        int numRegions = 10;
+        int numSubpartitions = 5;
+        int bufferSize = 1024;
+        Random random = new Random();
+
+        Queue<Buffer>[] subpartitionBuffers = new ArrayDeque[numRegions];
+        List<Buffer>[] buffersRead = new List[numRegions];
+        for (int region = 0; region < numRegions; region++) {
+            subpartitionBuffers[region] = new ArrayDeque<>();
+            buffersRead[region] = new ArrayList<>();
+        }
+
+        int[] writeOrder = new int[] {0, 1, 2, 3, 4};
+        PartitionedFileWriter fileWriter =
+                createPartitionedFileWriter(numSubpartitions, writeOrder);
+        for (int region = 0; region < numRegions; ++region) {
+            fileWriter.startNewRegion(false);
+            for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+                if (random.nextBoolean()) {
+                    Buffer buffer = createBuffer(random, bufferSize);
+                    subpartitionBuffers[region].add(buffer);
+                    fileWriter.writeBuffers(getBufferWithSubpartitions(buffer, 
subpartition));
+                }
+            }
+        }
+        PartitionedFile partitionedFile = fileWriter.finish();
+
+        FileChannel dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
+        FileChannel indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
+        PartitionedFileReader fileReader =
+                new PartitionedFileReader(
+                        partitionedFile,
+                        new ResultSubpartitionIndexSet(0, numSubpartitions - 
1),
+                        dataFileChannel,
+                        indexFileChannel,
+                        BufferReaderWriterUtil.allocatedHeaderBuffer(),
+                        createAndConfigIndexEntryBuffer(),
+                        writeOrder[0]);
+        int regionIndex = 0;
+        while (fileReader.hasRemaining()) {
+            if (subpartitionBuffers[regionIndex].isEmpty()) {
+                regionIndex++;
+            } else {
+                int finalRegionIndex = regionIndex;
+                fileReader.readCurrentRegion(
+                        allocateBuffers(bufferSize, 10),
+                        FreeingBufferRecycler.INSTANCE,
+                        buffer -> addReadBuffer(buffer, 
buffersRead[finalRegionIndex]));
+                for (Buffer buffer : buffersRead[finalRegionIndex]) {
+                    assertBufferEquals(
+                            
checkNotNull(subpartitionBuffers[finalRegionIndex].poll()), buffer);
+                }
+
+                assertThat(subpartitionBuffers[finalRegionIndex]).isEmpty();
+                regionIndex++;
+            }
+        }
+        IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
+    }
+
     private void assertBufferEquals(Buffer expected, Buffer actual) {
         assertThat(expected.getDataType()).isEqualTo(actual.getDataType());
         
assertThat(expected.getNioBufferReadable()).isEqualTo(actual.getNioBufferReadable());
@@ -498,7 +564,8 @@ class PartitionedFileWriteReadTest {
 
     @Test
     void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
-        PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(2);
+        PartitionedFileWriter partitionedFileWriter =
+                createPartitionedFileWriter(2, new int[] {1, 0});
         try {
             MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(1024);
 
@@ -609,7 +676,8 @@ class PartitionedFileWriteReadTest {
                         createPartitionedFileWriter(
                                 numSubpartitions,
                                 PartitionedFile.INDEX_ENTRY_SIZE * 
numSubpartitions,
-                                PartitionedFile.INDEX_ENTRY_SIZE * 
numSubpartitions),
+                                PartitionedFile.INDEX_ENTRY_SIZE * 
numSubpartitions,
+                                writeOrder),
                         subpartitionIndex -> subpartitionIndex,
                         random.nextBoolean(),
                         writeOrder);
@@ -659,29 +727,34 @@ class PartitionedFileWriteReadTest {
     }
 
     private PartitionedFile createEmptyPartitionedFile() throws IOException {
-        PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(2);
+        PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(2, new int[0]);
         return partitionedFileWriter.finish();
     }
 
-    private PartitionedFileWriter createPartitionedFileWriter(int 
numSubpartitions)
-            throws IOException {
-        return createPartitionedFileWriter(numSubpartitions, 640);
+    private PartitionedFileWriter createPartitionedFileWriter(
+            int numSubpartitions, int[] writeOrder) throws IOException {
+        return createPartitionedFileWriter(numSubpartitions, 640, writeOrder);
     }
 
     private PartitionedFileWriter createPartitionedFileWriter(
-            int numSubpartitions, int minIndexBufferSize, int 
maxIndexBufferSize)
+            int numSubpartitions, int minIndexBufferSize, int 
maxIndexBufferSize, int[] writeOrder)
             throws IOException {
         return new PartitionedFileWriter(
-                numSubpartitions, minIndexBufferSize, maxIndexBufferSize, 
tempPath.toString());
+                numSubpartitions,
+                minIndexBufferSize,
+                maxIndexBufferSize,
+                tempPath.toString(),
+                writeOrder);
     }
 
     private PartitionedFileWriter createPartitionedFileWriter(
-            int numSubpartitions, int maxIndexBufferSize) throws IOException {
-        return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, 
tempPath.toString());
+            int numSubpartitions, int maxIndexBufferSize, int[] writeOrder) 
throws IOException {
+        return new PartitionedFileWriter(
+                numSubpartitions, maxIndexBufferSize, tempPath.toString(), 
writeOrder);
     }
 
     private PartitionedFileWriter createAndFinishPartitionedFileWriter() 
throws IOException {
-        PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(1);
+        PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(1, new int[0]);
         partitionedFileWriter.finish();
         return partitionedFileWriter;
     }

Reply via email to