This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63443aec09ece8596321328273c1e431e5029c4d
Author: Weijie Guo <[email protected]>
AuthorDate: Tue May 9 16:08:36 2023 +0800

    [FLINK-32027][runtime] Fix the potential concurrent reading bug of index 
file for SortMergeShuffle.
    
    This closes #22549
---
 .../io/network/partition/PartitionedFile.java      |   6 +-
 .../network/partition/PartitionedFileWriter.java   |  10 +-
 .../partition/PartitionedFileWriteReadTest.java    | 201 +++++++++++++++++----
 3 files changed, 174 insertions(+), 43 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index fee4b313fc2..0630ec8dabd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -142,8 +142,10 @@ public class PartitionedFile {
                 target.put(indexEntryCache.get((int) indexEntryOffset + i));
             }
         } else {
-            indexFile.position(indexEntryOffset);
-            BufferReaderWriterUtil.readByteBufferFully(indexFile, target);
+            synchronized (indexFilePath) {
+                indexFile.position(indexEntryOffset);
+                BufferReaderWriterUtil.readByteBufferFully(indexFile, target);
+            }
         }
         target.flip();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
index d5b738e045b..5168d1097d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
@@ -104,6 +105,13 @@ public class PartitionedFileWriter implements 
AutoCloseable {
 
     public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, 
String basePath)
             throws IOException {
+        this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, 
basePath);
+    }
+
+    @VisibleForTesting
+    PartitionedFileWriter(
+            int numSubpartitions, int minIndexBufferSize, int 
maxIndexBufferSize, String basePath)
+            throws IOException {
         checkArgument(numSubpartitions > 0, "Illegal number of 
subpartitions.");
         checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache 
size.");
         checkArgument(basePath != null, "Base path must not be null.");
@@ -115,7 +123,7 @@ public class PartitionedFileWriter implements AutoCloseable 
{
         this.dataFilePath = new File(basePath + 
PartitionedFile.DATA_FILE_SUFFIX).toPath();
         this.indexFilePath = new File(basePath + 
PartitionedFile.INDEX_FILE_SUFFIX).toPath();
 
-        this.indexBuffer = ByteBuffer.allocate(MIN_INDEX_BUFFER_SIZE);
+        this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize);
         BufferReaderWriterUtil.configureByteBuffer(indexBuffer);
 
         this.dataFileChannel = openFileChannel(dataFilePath);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index 5883c242542..b7be613ce9c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -59,51 +61,25 @@ class PartitionedFileWriteReadTest {
         int bufferSize = 1024;
         int numBuffers = 1000;
         int numRegions = 10;
-        Random random = new Random(1111);
 
         List<Buffer>[] buffersWritten = new List[numSubpartitions];
         List<Buffer>[] buffersRead = new List[numSubpartitions];
-        List<BufferWithChannel>[] regionBuffers = new List[numSubpartitions];
+        List<Tuple2<Long, Long>>[] regionStat = new List[numSubpartitions];
         for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
             buffersWritten[subpartition] = new ArrayList<>();
             buffersRead[subpartition] = new ArrayList<>();
-            regionBuffers[subpartition] = new ArrayList<>();
+            regionStat[subpartition] = new ArrayList<>();
         }
 
-        PartitionedFileWriter fileWriter = 
createPartitionedFileWriter(numSubpartitions);
-        for (int region = 0; region < numRegions; ++region) {
-            boolean isBroadcastRegion = random.nextBoolean();
-            fileWriter.startNewRegion(isBroadcastRegion);
-
-            for (int i = 0; i < numBuffers; ++i) {
-                Buffer buffer = createBuffer(random, bufferSize);
-                if (isBroadcastRegion) {
-                    for (int subpartition = 0; subpartition < 
numSubpartitions; ++subpartition) {
-                        buffersWritten[subpartition].add(buffer);
-                        regionBuffers[subpartition].add(
-                                new BufferWithChannel(buffer, subpartition));
-                    }
-                } else {
-                    int subpartition = random.nextInt(numSubpartitions);
-                    buffersWritten[subpartition].add(buffer);
-                    regionBuffers[subpartition].add(new 
BufferWithChannel(buffer, subpartition));
-                }
-            }
-
-            int[] writeOrder = 
DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
-            for (int index = 0; index < numSubpartitions; ++index) {
-                int subpartition = writeOrder[index];
-                fileWriter.writeBuffers(regionBuffers[subpartition]);
-                if (isBroadcastRegion) {
-                    break;
-                }
-            }
-
-            for (int index = 0; index < numSubpartitions; ++index) {
-                regionBuffers[index].clear();
-            }
-        }
-        PartitionedFile partitionedFile = fileWriter.finish();
+        PartitionedFile partitionedFile =
+                createPartitionedFile(
+                        numSubpartitions,
+                        bufferSize,
+                        numBuffers,
+                        numRegions,
+                        buffersWritten,
+                        regionStat,
+                        createPartitionedFileWriter(numSubpartitions));
 
         FileChannel dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
         FileChannel indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
@@ -135,6 +111,71 @@ class PartitionedFileWriteReadTest {
         }
     }
 
+    private PartitionedFile createPartitionedFile(
+            int numSubpartitions,
+            int bufferSize,
+            int numBuffers,
+            int numRegions,
+            List<Buffer>[] buffersWritten,
+            List<Tuple2<Long, Long>>[] regionStat,
+            PartitionedFileWriter fileWriter)
+            throws IOException {
+        Random random = new Random(1111);
+        long currentOffset = 0L;
+        for (int region = 0; region < numRegions; ++region) {
+            boolean isBroadcastRegion = random.nextBoolean();
+            fileWriter.startNewRegion(isBroadcastRegion);
+            List<BufferWithChannel>[] bufferWithChannels = new 
List[numSubpartitions];
+            for (int i = 0; i < numSubpartitions; i++) {
+                bufferWithChannels[i] = new ArrayList<>();
+            }
+
+            for (int i = 0; i < numBuffers; ++i) {
+                Buffer buffer = createBuffer(random, bufferSize);
+                if (isBroadcastRegion) {
+                    for (int subpartition = 0; subpartition < 
numSubpartitions; ++subpartition) {
+                        buffersWritten[subpartition].add(buffer);
+                        bufferWithChannels[subpartition].add(
+                                new BufferWithChannel(buffer, subpartition));
+                    }
+                } else {
+                    int subpartition = random.nextInt(numSubpartitions);
+                    buffersWritten[subpartition].add(buffer);
+                    bufferWithChannels[subpartition].add(
+                            new BufferWithChannel(buffer, subpartition));
+                }
+            }
+
+            int[] writeOrder = 
DataBufferTest.getRandomSubpartitionOrder(numSubpartitions);
+            for (int index = 0; index < numSubpartitions; ++index) {
+                int subpartition = writeOrder[index];
+                fileWriter.writeBuffers(bufferWithChannels[subpartition]);
+                long totalBytes = 
getTotalBytes(bufferWithChannels[subpartition]);
+                if (isBroadcastRegion) {
+                    for (int j = 0; j < numSubpartitions; j++) {
+                        regionStat[j].add(Tuple2.of(currentOffset, 
totalBytes));
+                    }
+                    currentOffset += totalBytes;
+                    break;
+                } else {
+                    regionStat[subpartition].add(Tuple2.of(currentOffset, 
totalBytes));
+                    currentOffset += totalBytes;
+                }
+            }
+        }
+        return fileWriter.finish();
+    }
+
+    private static long getTotalBytes(List<BufferWithChannel> 
bufferWithChannels) {
+        long totalBytes = 0L;
+        for (BufferWithChannel bufferWithChannel : bufferWithChannels) {
+            totalBytes +=
+                    bufferWithChannel.getBuffer().readableBytes()
+                            + BufferReaderWriterUtil.HEADER_LENGTH;
+        }
+        return totalBytes;
+    }
+
     private void addReadBuffer(Buffer buffer, List<Buffer> buffersRead) {
         int numBytes = buffer.readableBytes();
         MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
@@ -280,7 +321,7 @@ class PartitionedFileWriteReadTest {
         int bufferSize = 1024;
         int numSubpartitions = 2;
         int targetSubpartition = 1;
-        PartitionedFile partitionedFile = createPartitionedFile();
+        PartitionedFile partitionedFile = createEmptyPartitionedFile();
 
         List<Buffer>[] buffersRead = new List[numSubpartitions];
         for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
@@ -306,6 +347,74 @@ class PartitionedFileWriteReadTest {
         IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
     }
 
+    /**
+     * For <a
+     * 
href="https://issues.apache.org/jira/projects/FLINK/issues/FLINK-32027";>FLINK-32027</a>.
+     */
+    @Test
+    void testMultipleThreadGetIndexEntry() throws Exception {
+        final int numSubpartitions = 5;
+        final int bufferSize = 1024;
+        final int numBuffers = 100;
+        final int numRegions = 10;
+
+        List<Buffer>[] buffersWritten = new List[numSubpartitions];
+        List<Buffer>[] buffersRead = new List[numSubpartitions];
+        List<Tuple2<Long, Long>>[] regionStat = new List[numSubpartitions];
+        for (int subpartition = 0; subpartition < numSubpartitions; 
++subpartition) {
+            buffersWritten[subpartition] = new ArrayList<>();
+            buffersRead[subpartition] = new ArrayList<>();
+            regionStat[subpartition] = new ArrayList<>();
+        }
+
+        PartitionedFile partitionedFile =
+                createPartitionedFile(
+                        numSubpartitions,
+                        bufferSize,
+                        numBuffers,
+                        numRegions,
+                        buffersWritten,
+                        regionStat,
+                        createPartitionedFileWriter(
+                                numSubpartitions,
+                                PartitionedFile.INDEX_ENTRY_SIZE * 
numSubpartitions,
+                                PartitionedFile.INDEX_ENTRY_SIZE * 
numSubpartitions));
+
+        FileChannel dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
+        FileChannel indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
+
+        CheckedThread[] readers = new CheckedThread[numSubpartitions];
+        for (int i = 0; i < numSubpartitions; i++) {
+            final int subpartition = i;
+            readers[i] =
+                    new CheckedThread() {
+                        @Override
+                        public void go() throws Exception {
+                            ByteBuffer indexEntryBuffer = 
createAndConfigIndexEntryBuffer();
+                            for (int region = 0; region < numRegions; 
region++) {
+                                partitionedFile.getIndexEntry(
+                                        indexFileChannel, indexEntryBuffer, 
region, subpartition);
+                                long offset = indexEntryBuffer.getLong();
+                                long regionBytes = indexEntryBuffer.getLong();
+                                assertThat(offset)
+                                        
.isEqualTo(regionStat[subpartition].get(region).f0);
+                                assertThat(regionBytes)
+                                        
.isEqualTo(regionStat[subpartition].get(region).f1);
+                            }
+                        }
+                    };
+        }
+
+        for (CheckedThread reader : readers) {
+            reader.start();
+        }
+        for (CheckedThread reader : readers) {
+            reader.sync();
+        }
+
+        IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
+    }
+
     private FileChannel openFileChannel(Path path) throws IOException {
         return FileChannel.open(path, StandardOpenOption.READ);
     }
@@ -314,14 +423,26 @@ class PartitionedFileWriteReadTest {
         return Collections.singletonList(new BufferWithChannel(buffer, 
channelIndex));
     }
 
-    private PartitionedFile createPartitionedFile() throws IOException {
+    private PartitionedFile createEmptyPartitionedFile() throws IOException {
         PartitionedFileWriter partitionedFileWriter = 
createPartitionedFileWriter(2);
         return partitionedFileWriter.finish();
     }
 
     private PartitionedFileWriter createPartitionedFileWriter(int 
numSubpartitions)
             throws IOException {
-        return new PartitionedFileWriter(numSubpartitions, 640, 
tempPath.toString());
+        return createPartitionedFileWriter(numSubpartitions, 640);
+    }
+
+    private PartitionedFileWriter createPartitionedFileWriter(
+            int numSubpartitions, int minIndexBufferSize, int 
maxIndexBufferSize)
+            throws IOException {
+        return new PartitionedFileWriter(
+                numSubpartitions, minIndexBufferSize, maxIndexBufferSize, 
tempPath.toString());
+    }
+
+    private PartitionedFileWriter createPartitionedFileWriter(
+            int numSubpartitions, int maxIndexBufferSize) throws IOException {
+        return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, 
tempPath.toString());
     }
 
     private PartitionedFileWriter createAndFinishPartitionedFileWriter() 
throws IOException {

Reply via email to