This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63443aec09ece8596321328273c1e431e5029c4d Author: Weijie Guo <[email protected]> AuthorDate: Tue May 9 16:08:36 2023 +0800 [FLINK-32027][runtime] Fix the potential concurrent reading bug of index file for SortMergeShuffle. This closes #22549 --- .../io/network/partition/PartitionedFile.java | 6 +- .../network/partition/PartitionedFileWriter.java | 10 +- .../partition/PartitionedFileWriteReadTest.java | 201 +++++++++++++++++---- 3 files changed, 174 insertions(+), 43 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java index fee4b313fc2..0630ec8dabd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java @@ -142,8 +142,10 @@ public class PartitionedFile { target.put(indexEntryCache.get((int) indexEntryOffset + i)); } } else { - indexFile.position(indexEntryOffset); - BufferReaderWriterUtil.readByteBufferFully(indexFile, target); + synchronized (indexFilePath) { + indexFile.position(indexEntryOffset); + BufferReaderWriterUtil.readByteBufferFully(indexFile, target); + } } target.flip(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java index d5b738e045b..5168d1097d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IOUtils; @@ -104,6 +105,13 @@ public class PartitionedFileWriter implements AutoCloseable { public PartitionedFileWriter(int numSubpartitions, int maxIndexBufferSize, String basePath) throws IOException { + this(numSubpartitions, MIN_INDEX_BUFFER_SIZE, maxIndexBufferSize, basePath); + } + + @VisibleForTesting + PartitionedFileWriter( + int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize, String basePath) + throws IOException { checkArgument(numSubpartitions > 0, "Illegal number of subpartitions."); checkArgument(maxIndexBufferSize > 0, "Illegal maximum index cache size."); checkArgument(basePath != null, "Base path must not be null."); @@ -115,7 +123,7 @@ public class PartitionedFileWriter implements AutoCloseable { this.dataFilePath = new File(basePath + PartitionedFile.DATA_FILE_SUFFIX).toPath(); this.indexFilePath = new File(basePath + PartitionedFile.INDEX_FILE_SUFFIX).toPath(); - this.indexBuffer = ByteBuffer.allocate(MIN_INDEX_BUFFER_SIZE); + this.indexBuffer = ByteBuffer.allocate(minIndexBufferSize); BufferReaderWriterUtil.configureByteBuffer(indexBuffer); this.dataFileChannel = openFileChannel(dataFilePath); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java index 5883c242542..b7be613ce9c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.CompositeBuffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; @@ -59,51 +61,25 @@ class PartitionedFileWriteReadTest { int bufferSize = 1024; int numBuffers = 1000; int numRegions = 10; - Random random = new Random(1111); List<Buffer>[] buffersWritten = new List[numSubpartitions]; List<Buffer>[] buffersRead = new List[numSubpartitions]; - List<BufferWithChannel>[] regionBuffers = new List[numSubpartitions]; + List<Tuple2<Long, Long>>[] regionStat = new List[numSubpartitions]; for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { buffersWritten[subpartition] = new ArrayList<>(); buffersRead[subpartition] = new ArrayList<>(); - regionBuffers[subpartition] = new ArrayList<>(); + regionStat[subpartition] = new ArrayList<>(); } - PartitionedFileWriter fileWriter = createPartitionedFileWriter(numSubpartitions); - for (int region = 0; region < numRegions; ++region) { - boolean isBroadcastRegion = random.nextBoolean(); - fileWriter.startNewRegion(isBroadcastRegion); - - for (int i = 0; i < numBuffers; ++i) { - Buffer buffer = createBuffer(random, bufferSize); - if (isBroadcastRegion) { - for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { - buffersWritten[subpartition].add(buffer); - regionBuffers[subpartition].add( - new BufferWithChannel(buffer, subpartition)); - } - } else { - int subpartition = random.nextInt(numSubpartitions); - buffersWritten[subpartition].add(buffer); - regionBuffers[subpartition].add(new BufferWithChannel(buffer, subpartition)); - } - } - - int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions); - for (int index = 0; index < numSubpartitions; ++index) { - int subpartition = writeOrder[index]; - fileWriter.writeBuffers(regionBuffers[subpartition]); - if (isBroadcastRegion) { - break; - } - } - - for (int index = 0; index < numSubpartitions; ++index) { - regionBuffers[index].clear(); - } - } - PartitionedFile partitionedFile = fileWriter.finish(); + PartitionedFile partitionedFile = + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter(numSubpartitions)); FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); @@ -135,6 +111,71 @@ class PartitionedFileWriteReadTest { } } + private PartitionedFile createPartitionedFile( + int numSubpartitions, + int bufferSize, + int numBuffers, + int numRegions, + List<Buffer>[] buffersWritten, + List<Tuple2<Long, Long>>[] regionStat, + PartitionedFileWriter fileWriter) + throws IOException { + Random random = new Random(1111); + long currentOffset = 0L; + for (int region = 0; region < numRegions; ++region) { + boolean isBroadcastRegion = random.nextBoolean(); + fileWriter.startNewRegion(isBroadcastRegion); + List<BufferWithChannel>[] bufferWithChannels = new List[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + bufferWithChannels[i] = new ArrayList<>(); + } + + for (int i = 0; i < numBuffers; ++i) { + Buffer buffer = createBuffer(random, bufferSize); + if (isBroadcastRegion) { + for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { + buffersWritten[subpartition].add(buffer); + bufferWithChannels[subpartition].add( + new BufferWithChannel(buffer, subpartition)); + } + } else { + int subpartition = random.nextInt(numSubpartitions); + buffersWritten[subpartition].add(buffer); + bufferWithChannels[subpartition].add( + new BufferWithChannel(buffer, subpartition)); + } + } + + int[] writeOrder = DataBufferTest.getRandomSubpartitionOrder(numSubpartitions); + for (int index = 0; index < numSubpartitions; ++index) { + int subpartition = writeOrder[index]; + fileWriter.writeBuffers(bufferWithChannels[subpartition]); + long totalBytes = getTotalBytes(bufferWithChannels[subpartition]); + if (isBroadcastRegion) { + for (int j = 0; j < numSubpartitions; j++) { + regionStat[j].add(Tuple2.of(currentOffset, totalBytes)); + } + currentOffset += totalBytes; + break; + } else { + regionStat[subpartition].add(Tuple2.of(currentOffset, totalBytes)); + currentOffset += totalBytes; + } + } + } + return fileWriter.finish(); + } + + private static long getTotalBytes(List<BufferWithChannel> bufferWithChannels) { + long totalBytes = 0L; + for (BufferWithChannel bufferWithChannel : bufferWithChannels) { + totalBytes += + bufferWithChannel.getBuffer().readableBytes() + + BufferReaderWriterUtil.HEADER_LENGTH; + } + return totalBytes; + } + private void addReadBuffer(Buffer buffer, List<Buffer> buffersRead) { int numBytes = buffer.readableBytes(); MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(numBytes); @@ -280,7 +321,7 @@ class PartitionedFileWriteReadTest { int bufferSize = 1024; int numSubpartitions = 2; int targetSubpartition = 1; - PartitionedFile partitionedFile = createPartitionedFile(); + PartitionedFile partitionedFile = createEmptyPartitionedFile(); List<Buffer>[] buffersRead = new List[numSubpartitions]; for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { @@ -306,6 +347,74 @@ class PartitionedFileWriteReadTest { IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); } + /** + * For <a + * href="https://issues.apache.org/jira/projects/FLINK/issues/FLINK-32027">FLINK-32027</a>. + */ + @Test + void testMultipleThreadGetIndexEntry() throws Exception { + final int numSubpartitions = 5; + final int bufferSize = 1024; + final int numBuffers = 100; + final int numRegions = 10; + + List<Buffer>[] buffersWritten = new List[numSubpartitions]; + List<Buffer>[] buffersRead = new List[numSubpartitions]; + List<Tuple2<Long, Long>>[] regionStat = new List[numSubpartitions]; + for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) { + buffersWritten[subpartition] = new ArrayList<>(); + buffersRead[subpartition] = new ArrayList<>(); + regionStat[subpartition] = new ArrayList<>(); + } + + PartitionedFile partitionedFile = + createPartitionedFile( + numSubpartitions, + bufferSize, + numBuffers, + numRegions, + buffersWritten, + regionStat, + createPartitionedFileWriter( + numSubpartitions, + PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions, + PartitionedFile.INDEX_ENTRY_SIZE * numSubpartitions)); + + FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); + FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); + + CheckedThread[] readers = new CheckedThread[numSubpartitions]; + for (int i = 0; i < numSubpartitions; i++) { + final int subpartition = i; + readers[i] = + new CheckedThread() { + @Override + public void go() throws Exception { + ByteBuffer indexEntryBuffer = createAndConfigIndexEntryBuffer(); + for (int region = 0; region < numRegions; region++) { + partitionedFile.getIndexEntry( + indexFileChannel, indexEntryBuffer, region, subpartition); + long offset = indexEntryBuffer.getLong(); + long regionBytes = indexEntryBuffer.getLong(); + assertThat(offset) + .isEqualTo(regionStat[subpartition].get(region).f0); + assertThat(regionBytes) + .isEqualTo(regionStat[subpartition].get(region).f1); + } + } + }; + } + + for (CheckedThread reader : readers) { + reader.start(); + } + for (CheckedThread reader : readers) { + reader.sync(); + } + + IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel); + } + private FileChannel openFileChannel(Path path) throws IOException { return FileChannel.open(path, StandardOpenOption.READ); } @@ -314,14 +423,26 @@ class PartitionedFileWriteReadTest { return Collections.singletonList(new BufferWithChannel(buffer, channelIndex)); } - private PartitionedFile createPartitionedFile() throws IOException { + private PartitionedFile createEmptyPartitionedFile() throws IOException { PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2); return partitionedFileWriter.finish(); } private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions) throws IOException { - return new PartitionedFileWriter(numSubpartitions, 640, tempPath.toString()); + return createPartitionedFileWriter(numSubpartitions, 640); + } + + private PartitionedFileWriter createPartitionedFileWriter( + int numSubpartitions, int minIndexBufferSize, int maxIndexBufferSize) + throws IOException { + return new PartitionedFileWriter( + numSubpartitions, minIndexBufferSize, maxIndexBufferSize, tempPath.toString()); + } + + private PartitionedFileWriter createPartitionedFileWriter( + int numSubpartitions, int maxIndexBufferSize) throws IOException { + return new PartitionedFileWriter(numSubpartitions, maxIndexBufferSize, tempPath.toString()); } private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
