This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d65722f29a92440b63a98543b7ca62bd5ccd145e Author: Weijie Guo <[email protected]> AuthorDate: Wed Oct 19 17:28:58 2022 +0800 [FLINK-28889] Assign an unique identifier for each downstream consumer. --- .../partition/hybrid/HsResultPartition.java | 24 +++++--- .../partition/hybrid/HsResultPartitionTest.java | 72 ++++++++++++++++++++++ 2 files changed, 87 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 678fb697b29..06a15a507c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -69,6 +69,9 @@ public class HsResultPartition extends ResultPartition { private final HybridShuffleConfiguration hybridShuffleConfiguration; + /** Record the last assigned consumerId for each subpartition. */ + private final HsConsumerId[] lastConsumerIds; + private boolean hasNotifiedEndOfUserRecords; @Nullable private HsMemoryDataManager memoryDataManager; @@ -110,6 +113,7 @@ public class HsResultPartition extends ResultPartition { dataFilePath, HsSubpartitionFileReaderImpl.Factory.INSTANCE, hybridShuffleConfiguration); + this.lastConsumerIds = new HsConsumerId[numSubpartitions]; } // Called by task thread. @@ -186,21 +190,23 @@ public class HsResultPartition extends ResultPartition { throw new PartitionNotFoundException(getPartitionId()); } - HsSubpartitionConsumer subpartitionView = new HsSubpartitionConsumer(availabilityListener); + HsSubpartitionConsumer subpartitionConsumer = + new HsSubpartitionConsumer(availabilityListener); + // assign a unique id for each consumer, now it is guaranteed by the value that is one + // higher than the last consumerId's id field. + HsConsumerId consumerId = HsConsumerId.newId(lastConsumerIds[subpartitionId]); + lastConsumerIds[subpartitionId] = consumerId; HsDataView diskDataView = - // TODO pass real consumerId in the next commit. fileDataManager.registerNewConsumer( - subpartitionId, HsConsumerId.DEFAULT, subpartitionView); + subpartitionId, consumerId, subpartitionConsumer); HsDataView memoryDataView = checkNotNull(memoryDataManager) - // TODO pass real consumerId in the next commit. - .registerNewConsumer( - subpartitionId, HsConsumerId.DEFAULT, subpartitionView); + .registerNewConsumer(subpartitionId, consumerId, subpartitionConsumer); - subpartitionView.setDiskDataView(diskDataView); - subpartitionView.setMemoryDataView(memoryDataView); - return subpartitionView; + subpartitionConsumer.setDiskDataView(diskDataView); + subpartitionConsumer.setMemoryDataView(memoryDataView); + return subpartitionConsumer; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java index 70378a453d0..18425d5ea01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java @@ -221,6 +221,63 @@ class HsResultPartitionTest { } } + /** Test write and read data from single subpartition with multiple consumer. */ + @Test + void testMultipleConsumer() throws Exception { + final int numBuffers = 10; + final int numRecords = 10; + final int numConsumers = 2; + final int targetChannel = 0; + final Random random = new Random(); + + BufferPool bufferPool = globalPool.createBufferPool(numBuffers, numBuffers); + try (HsResultPartition resultPartition = createHsResultPartition(2, bufferPool)) { + List<ByteBuffer> dataWritten = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + ByteBuffer record = generateRandomData(bufferSize, random); + resultPartition.emitRecord(record, targetChannel); + dataWritten.add(record); + } + resultPartition.finish(); + + Tuple2[] viewAndListeners = + createMultipleConsumerView(resultPartition, targetChannel, 2); + + List<List<Buffer>> dataRead = new ArrayList<>(); + for (int i = 0; i < numConsumers; i++) { + dataRead.add(new ArrayList<>()); + } + readData( + viewAndListeners, + (buffer, subpartition) -> { + int numBytes = buffer.readableBytes(); + if (buffer.isBuffer()) { + MemorySegment segment = + MemorySegmentFactory.allocateUnpooledSegment(numBytes); + segment.put(0, buffer.getNioBufferReadable(), numBytes); + dataRead.get(subpartition) + .add( + new NetworkBuffer( + segment, + (buf) -> {}, + buffer.getDataType(), + numBytes)); + } + }); + + for (int i = 0; i < numConsumers; i++) { + assertThat(dataWritten).hasSameSizeAs(dataRead.get(i)); + List<Buffer> readBufferList = dataRead.get(i); + for (int j = 0; j < dataWritten.size(); j++) { + ByteBuffer bufferWritten = dataWritten.get(j); + bufferWritten.rewind(); + Buffer bufferRead = readBufferList.get(j); + assertThat(bufferRead.getNioBufferReadable()).isEqualTo(bufferWritten); + } + } + } + } + @Test void testClose() throws Exception { final int numBuffers = 1; @@ -484,6 +541,21 @@ class HsResultPartitionTest { return viewAndListeners; } + /** Create multiple consumer and bufferAvailabilityListener for single subpartition. */ + private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] + createMultipleConsumerView( + HsResultPartition partition, int subpartitionId, int numConsumers) + throws Exception { + Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] viewAndListeners = + new Tuple2[numConsumers]; + for (int consumer = 0; consumer < numConsumers; ++consumer) { + TestingBufferAvailabilityListener listener = new TestingBufferAvailabilityListener(); + viewAndListeners[consumer] = + Tuple2.of(partition.createSubpartitionView(subpartitionId, listener), listener); + } + return viewAndListeners; + } + private static final class TestingBufferAvailabilityListener implements BufferAvailabilityListener {
