This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d65722f29a92440b63a98543b7ca62bd5ccd145e
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Oct 19 17:28:58 2022 +0800

    [FLINK-28889] Assign an unique identifier for each downstream consumer.
---
 .../partition/hybrid/HsResultPartition.java        | 24 +++++---
 .../partition/hybrid/HsResultPartitionTest.java    | 72 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 678fb697b29..06a15a507c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -69,6 +69,9 @@ public class HsResultPartition extends ResultPartition {
 
     private final HybridShuffleConfiguration hybridShuffleConfiguration;
 
+    /** Record the last assigned consumerId for each subpartition. */
+    private final HsConsumerId[] lastConsumerIds;
+
     private boolean hasNotifiedEndOfUserRecords;
 
     @Nullable private HsMemoryDataManager memoryDataManager;
@@ -110,6 +113,7 @@ public class HsResultPartition extends ResultPartition {
                         dataFilePath,
                         HsSubpartitionFileReaderImpl.Factory.INSTANCE,
                         hybridShuffleConfiguration);
+        this.lastConsumerIds = new HsConsumerId[numSubpartitions];
     }
 
     // Called by task thread.
@@ -186,21 +190,23 @@ public class HsResultPartition extends ResultPartition {
             throw new PartitionNotFoundException(getPartitionId());
         }
 
-        HsSubpartitionConsumer subpartitionView = new 
HsSubpartitionConsumer(availabilityListener);
+        HsSubpartitionConsumer subpartitionConsumer =
+                new HsSubpartitionConsumer(availabilityListener);
+        // assign a unique id for each consumer, now it is guaranteed by the 
value that is one
+        // higher than the last consumerId's id field.
+        HsConsumerId consumerId = 
HsConsumerId.newId(lastConsumerIds[subpartitionId]);
+        lastConsumerIds[subpartitionId] = consumerId;
         HsDataView diskDataView =
-                // TODO pass real consumerId in the next commit.
                 fileDataManager.registerNewConsumer(
-                        subpartitionId, HsConsumerId.DEFAULT, 
subpartitionView);
+                        subpartitionId, consumerId, subpartitionConsumer);
 
         HsDataView memoryDataView =
                 checkNotNull(memoryDataManager)
-                        // TODO pass real consumerId in the next commit.
-                        .registerNewConsumer(
-                                subpartitionId, HsConsumerId.DEFAULT, 
subpartitionView);
+                        .registerNewConsumer(subpartitionId, consumerId, 
subpartitionConsumer);
 
-        subpartitionView.setDiskDataView(diskDataView);
-        subpartitionView.setMemoryDataView(memoryDataView);
-        return subpartitionView;
+        subpartitionConsumer.setDiskDataView(diskDataView);
+        subpartitionConsumer.setMemoryDataView(memoryDataView);
+        return subpartitionConsumer;
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index 70378a453d0..18425d5ea01 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -221,6 +221,63 @@ class HsResultPartitionTest {
         }
     }
 
+    /** Test write and read data from single subpartition with multiple 
consumer. */
+    @Test
+    void testMultipleConsumer() throws Exception {
+        final int numBuffers = 10;
+        final int numRecords = 10;
+        final int numConsumers = 2;
+        final int targetChannel = 0;
+        final Random random = new Random();
+
+        BufferPool bufferPool = globalPool.createBufferPool(numBuffers, 
numBuffers);
+        try (HsResultPartition resultPartition = createHsResultPartition(2, 
bufferPool)) {
+            List<ByteBuffer> dataWritten = new ArrayList<>();
+            for (int i = 0; i < numRecords; i++) {
+                ByteBuffer record = generateRandomData(bufferSize, random);
+                resultPartition.emitRecord(record, targetChannel);
+                dataWritten.add(record);
+            }
+            resultPartition.finish();
+
+            Tuple2[] viewAndListeners =
+                    createMultipleConsumerView(resultPartition, targetChannel, 
2);
+
+            List<List<Buffer>> dataRead = new ArrayList<>();
+            for (int i = 0; i < numConsumers; i++) {
+                dataRead.add(new ArrayList<>());
+            }
+            readData(
+                    viewAndListeners,
+                    (buffer, subpartition) -> {
+                        int numBytes = buffer.readableBytes();
+                        if (buffer.isBuffer()) {
+                            MemorySegment segment =
+                                    
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+                            segment.put(0, buffer.getNioBufferReadable(), 
numBytes);
+                            dataRead.get(subpartition)
+                                    .add(
+                                            new NetworkBuffer(
+                                                    segment,
+                                                    (buf) -> {},
+                                                    buffer.getDataType(),
+                                                    numBytes));
+                        }
+                    });
+
+            for (int i = 0; i < numConsumers; i++) {
+                assertThat(dataWritten).hasSameSizeAs(dataRead.get(i));
+                List<Buffer> readBufferList = dataRead.get(i);
+                for (int j = 0; j < dataWritten.size(); j++) {
+                    ByteBuffer bufferWritten = dataWritten.get(j);
+                    bufferWritten.rewind();
+                    Buffer bufferRead = readBufferList.get(j);
+                    
assertThat(bufferRead.getNioBufferReadable()).isEqualTo(bufferWritten);
+                }
+            }
+        }
+    }
+
     @Test
     void testClose() throws Exception {
         final int numBuffers = 1;
@@ -484,6 +541,21 @@ class HsResultPartitionTest {
         return viewAndListeners;
     }
 
+    /** Create multiple consumer and bufferAvailabilityListener for single 
subpartition. */
+    private Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[]
+            createMultipleConsumerView(
+                    HsResultPartition partition, int subpartitionId, int 
numConsumers)
+                    throws Exception {
+        Tuple2<ResultSubpartitionView, TestingBufferAvailabilityListener>[] 
viewAndListeners =
+                new Tuple2[numConsumers];
+        for (int consumer = 0; consumer < numConsumers; ++consumer) {
+            TestingBufferAvailabilityListener listener = new 
TestingBufferAvailabilityListener();
+            viewAndListeners[consumer] =
+                    Tuple2.of(partition.createSubpartitionView(subpartitionId, 
listener), listener);
+        }
+        return viewAndListeners;
+    }
+
     private static final class TestingBufferAvailabilityListener
             implements BufferAvailabilityListener {
 

Reply via email to