This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f95c12e9a4a71930fb20feda611bd3c1584380ce Author: Weijie Guo <[email protected]> AuthorDate: Wed Oct 19 15:20:02 2022 +0800 [FLINK-28889] HsFileDataManager supports multiple consumer. --- .../partition/hybrid/HsFileDataManager.java | 7 ++-- .../partition/hybrid/HsResultPartition.java | 4 ++- .../partition/hybrid/HsSubpartitionFileReader.java | 1 + .../hybrid/HsSubpartitionFileReaderImpl.java | 10 ++++-- .../partition/hybrid/HsFileDataManagerTest.java | 24 +++++++------ .../hybrid/HsSubpartitionFileReaderImplTest.java | 40 ++++++++++++++++++++++ 6 files changed, 71 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index 82db961be9c..a54456cabcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -152,8 +152,10 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } /** This method only called by result partition to create subpartitionFileReader. */ - public HsDataView registerNewSubpartition( - int subpartitionId, HsSubpartitionConsumerInternalOperations operation) + public HsDataView registerNewConsumer( + int subpartitionId, + HsConsumerId consumerId, + HsSubpartitionConsumerInternalOperations operation) throws IOException { synchronized (lock) { checkState(!isReleased, "HsFileDataManager is already released."); @@ -162,6 +164,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler { HsSubpartitionFileReader subpartitionReader = fileReaderFactory.createFileReader( subpartitionId, + consumerId, dataFileChannel, operation, dataIndex, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 5e10b773397..678fb697b29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -188,7 +188,9 @@ public class HsResultPartition extends ResultPartition { HsSubpartitionConsumer subpartitionView = new HsSubpartitionConsumer(availabilityListener); HsDataView diskDataView = - fileDataManager.registerNewSubpartition(subpartitionId, subpartitionView); + // TODO pass real consumerId in the next commit. + fileDataManager.registerNewConsumer( + subpartitionId, HsConsumerId.DEFAULT, subpartitionView); HsDataView memoryDataView = checkNotNull(memoryDataManager) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java index c87bb237660..5b4bc0158bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java @@ -57,6 +57,7 @@ public interface HsSubpartitionFileReader extends Comparable<HsSubpartitionFileR interface Factory { HsSubpartitionFileReader createFileReader( int subpartitionId, + HsConsumerId consumerId, FileChannel dataFileChannel, HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java index de7bd599e14..e40e917cfa1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java @@ -55,6 +55,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { private final int subpartitionId; + private final HsConsumerId consumerId; + private final FileChannel dataFileChannel; private final HsSubpartitionConsumerInternalOperations operations; @@ -71,6 +73,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { public HsSubpartitionFileReaderImpl( int subpartitionId, + HsConsumerId consumerId, FileChannel dataFileChannel, HsSubpartitionConsumerInternalOperations operations, HsFileDataIndex dataIndex, @@ -78,6 +81,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { Consumer<HsSubpartitionFileReader> fileReaderReleaser, ByteBuffer headerBuf) { this.subpartitionId = subpartitionId; + this.consumerId = consumerId; this.dataFileChannel = dataFileChannel; this.operations = operations; this.headerBuf = headerBuf; @@ -95,12 +99,12 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { return false; } HsSubpartitionFileReaderImpl that = (HsSubpartitionFileReaderImpl) o; - return subpartitionId == that.subpartitionId; + return subpartitionId == that.subpartitionId && Objects.equals(consumerId, that.consumerId); } @Override public int hashCode() { - return Objects.hash(subpartitionId); + return Objects.hash(subpartitionId, consumerId); } /** @@ -486,6 +490,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { @Override public HsSubpartitionFileReader createFileReader( int subpartitionId, + HsConsumerId consumerId, FileChannel dataFileChannel, HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, @@ -494,6 +499,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { ByteBuffer headerBuffer) { return new HsSubpartitionFileReaderImpl( subpartitionId, + consumerId, dataFileChannel, operation, dataIndex, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java index 0980688b7f2..401f95ff823 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java @@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId.DEFAULT; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; @@ -123,7 +124,7 @@ class HsFileDataManagerTest { assertThat(reader.readBuffers).isEmpty(); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -142,7 +143,7 @@ class HsFileDataManagerTest { factory.allReaders.add(reader); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -174,7 +175,7 @@ class HsFileDataManagerTest { }); factory.allReaders.add(reader); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -207,8 +208,8 @@ class HsFileDataManagerTest { factory.allReaders.add(reader1); factory.allReaders.add(reader2); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); - fileDataManager.registerNewSubpartition(1, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); + fileDataManager.registerNewConsumer(1, DEFAULT, subpartitionViewOperation); // trigger run. ioExecutor.trigger(); @@ -243,7 +244,7 @@ class HsFileDataManagerTest { reader.setFailConsumer((cause::complete)); factory.allReaders.add(reader); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -269,7 +270,7 @@ class HsFileDataManagerTest { }); factory.allReaders.add(reader); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -314,7 +315,7 @@ class HsFileDataManagerTest { }; releaseThread.start(); - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); @@ -335,7 +336,8 @@ class HsFileDataManagerTest { fileDataManager.release(); assertThatThrownBy( () -> { - fileDataManager.registerNewSubpartition(0, subpartitionViewOperation); + fileDataManager.registerNewConsumer( + 0, DEFAULT, subpartitionViewOperation); ioExecutor.trigger(); }) .isInstanceOf(IllegalStateException.class) @@ -358,6 +360,7 @@ class HsFileDataManagerTest { HsSubpartitionFileReaderImpl subpartitionFileReader = new HsSubpartitionFileReaderImpl( 0, + DEFAULT, dataFileChannel, subpartitionView, new HsFileDataIndexImpl(NUM_SUBPARTITIONS), @@ -376,7 +379,7 @@ class HsFileDataManagerTest { } }; factory.allReaders.add(subpartitionFileReader); - HsDataView diskDataView = fileDataManager.registerNewSubpartition(0, subpartitionView); + HsDataView diskDataView = fileDataManager.registerNewConsumer(0, DEFAULT, subpartitionView); subpartitionView.setDiskDataView(diskDataView); TestingHsDataView memoryDataView = TestingHsDataView.builder() @@ -496,6 +499,7 @@ class HsFileDataManagerTest { @Override public HsSubpartitionFileReader createFileReader( int subpartitionId, + HsConsumerId consumerId, FileChannel dataFileChannel, HsSubpartitionConsumerInternalOperations operation, HsFileDataIndex dataIndex, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java index e60eff5dc09..cccf9f5d93e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java @@ -501,6 +501,38 @@ class HsSubpartitionFileReaderImplTest { checkData(subpartitionFileReader, 1, 2, 3); } + @Test + void testMultipleFileReaderOfSingleSubpartition() throws Exception { + TestingSubpartitionConsumerInternalOperation viewNotifier1 = + new TestingSubpartitionConsumerInternalOperation(); + TestingSubpartitionConsumerInternalOperation viewNotifier2 = + new TestingSubpartitionConsumerInternalOperation(); + + HsConsumerId consumer0 = HsConsumerId.newId(null); + HsSubpartitionFileReaderImpl consumer1 = + createSubpartitionFileReader(0, consumer0, viewNotifier1); + HsSubpartitionFileReaderImpl consumer2 = + createSubpartitionFileReader(0, HsConsumerId.newId(consumer0), viewNotifier2); + + assertThat(consumer1).isNotEqualTo(consumer2); + + // write data to a single subpartition, then read these buffers by two consumers + // respectively. + writeDataToFile(0, 0, 1, 3); + + consumer1.prepareForScheduling(); + Queue<MemorySegment> memorySegments = createsMemorySegments(3); + consumer1.readBuffers(memorySegments, (ignore) -> {}); + assertThat(memorySegments).isEmpty(); + checkData(consumer1, 1, 2, 3); + + consumer2.prepareForScheduling(); + memorySegments = createsMemorySegments(3); + consumer2.readBuffers(memorySegments, (ignore) -> {}); + assertThat(memorySegments).isEmpty(); + checkData(consumer2, 1, 2, 3); + } + private static void checkData( HsSubpartitionFileReaderImpl fileReader, BufferDecompressor bufferDecompressor, @@ -530,8 +562,16 @@ class HsSubpartitionFileReaderImplTest { private HsSubpartitionFileReaderImpl createSubpartitionFileReader( int targetChannel, HsSubpartitionConsumerInternalOperations operations) { + return createSubpartitionFileReader(targetChannel, HsConsumerId.DEFAULT, operations); + } + + private HsSubpartitionFileReaderImpl createSubpartitionFileReader( + int targetChannel, + HsConsumerId consumerId, + HsSubpartitionConsumerInternalOperations operations) { return new HsSubpartitionFileReaderImpl( targetChannel, + consumerId, dataFileChannel, operations, diskIndex,
