This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f95c12e9a4a71930fb20feda611bd3c1584380ce
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Oct 19 15:20:02 2022 +0800

    [FLINK-28889] HsFileDataManager supports multiple consumer.
---
 .../partition/hybrid/HsFileDataManager.java        |  7 ++--
 .../partition/hybrid/HsResultPartition.java        |  4 ++-
 .../partition/hybrid/HsSubpartitionFileReader.java |  1 +
 .../hybrid/HsSubpartitionFileReaderImpl.java       | 10 ++++--
 .../partition/hybrid/HsFileDataManagerTest.java    | 24 +++++++------
 .../hybrid/HsSubpartitionFileReaderImplTest.java   | 40 ++++++++++++++++++++++
 6 files changed, 71 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 82db961be9c..a54456cabcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -152,8 +152,10 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
     }
 
     /** This method only called by result partition to create 
subpartitionFileReader. */
-    public HsDataView registerNewSubpartition(
-            int subpartitionId, HsSubpartitionConsumerInternalOperations 
operation)
+    public HsDataView registerNewConsumer(
+            int subpartitionId,
+            HsConsumerId consumerId,
+            HsSubpartitionConsumerInternalOperations operation)
             throws IOException {
         synchronized (lock) {
             checkState(!isReleased, "HsFileDataManager is already released.");
@@ -162,6 +164,7 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
             HsSubpartitionFileReader subpartitionReader =
                     fileReaderFactory.createFileReader(
                             subpartitionId,
+                            consumerId,
                             dataFileChannel,
                             operation,
                             dataIndex,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 5e10b773397..678fb697b29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -188,7 +188,9 @@ public class HsResultPartition extends ResultPartition {
 
         HsSubpartitionConsumer subpartitionView = new 
HsSubpartitionConsumer(availabilityListener);
         HsDataView diskDataView =
-                fileDataManager.registerNewSubpartition(subpartitionId, 
subpartitionView);
+                // TODO pass real consumerId in the next commit.
+                fileDataManager.registerNewConsumer(
+                        subpartitionId, HsConsumerId.DEFAULT, 
subpartitionView);
 
         HsDataView memoryDataView =
                 checkNotNull(memoryDataManager)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
index c87bb237660..5b4bc0158bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReader.java
@@ -57,6 +57,7 @@ public interface HsSubpartitionFileReader extends 
Comparable<HsSubpartitionFileR
     interface Factory {
         HsSubpartitionFileReader createFileReader(
                 int subpartitionId,
+                HsConsumerId consumerId,
                 FileChannel dataFileChannel,
                 HsSubpartitionConsumerInternalOperations operation,
                 HsFileDataIndex dataIndex,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index de7bd599e14..e40e917cfa1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -55,6 +55,8 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
     private final int subpartitionId;
 
+    private final HsConsumerId consumerId;
+
     private final FileChannel dataFileChannel;
 
     private final HsSubpartitionConsumerInternalOperations operations;
@@ -71,6 +73,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
     public HsSubpartitionFileReaderImpl(
             int subpartitionId,
+            HsConsumerId consumerId,
             FileChannel dataFileChannel,
             HsSubpartitionConsumerInternalOperations operations,
             HsFileDataIndex dataIndex,
@@ -78,6 +81,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
             Consumer<HsSubpartitionFileReader> fileReaderReleaser,
             ByteBuffer headerBuf) {
         this.subpartitionId = subpartitionId;
+        this.consumerId = consumerId;
         this.dataFileChannel = dataFileChannel;
         this.operations = operations;
         this.headerBuf = headerBuf;
@@ -95,12 +99,12 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
             return false;
         }
         HsSubpartitionFileReaderImpl that = (HsSubpartitionFileReaderImpl) o;
-        return subpartitionId == that.subpartitionId;
+        return subpartitionId == that.subpartitionId && 
Objects.equals(consumerId, that.consumerId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(subpartitionId);
+        return Objects.hash(subpartitionId, consumerId);
     }
 
     /**
@@ -486,6 +490,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         @Override
         public HsSubpartitionFileReader createFileReader(
                 int subpartitionId,
+                HsConsumerId consumerId,
                 FileChannel dataFileChannel,
                 HsSubpartitionConsumerInternalOperations operation,
                 HsFileDataIndex dataIndex,
@@ -494,6 +499,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
                 ByteBuffer headerBuffer) {
             return new HsSubpartitionFileReaderImpl(
                     subpartitionId,
+                    consumerId,
                     dataFileChannel,
                     operation,
                     dataIndex,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
index 0980688b7f2..401f95ff823 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManagerTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsConsumerId.DEFAULT;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -123,7 +124,7 @@ class HsFileDataManagerTest {
 
         assertThat(reader.readBuffers).isEmpty();
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -142,7 +143,7 @@ class HsFileDataManagerTest {
 
         factory.allReaders.add(reader);
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -174,7 +175,7 @@ class HsFileDataManagerTest {
                 });
         factory.allReaders.add(reader);
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -207,8 +208,8 @@ class HsFileDataManagerTest {
         factory.allReaders.add(reader1);
         factory.allReaders.add(reader2);
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
-        fileDataManager.registerNewSubpartition(1, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(1, DEFAULT, 
subpartitionViewOperation);
 
         // trigger run.
         ioExecutor.trigger();
@@ -243,7 +244,7 @@ class HsFileDataManagerTest {
         reader.setFailConsumer((cause::complete));
         factory.allReaders.add(reader);
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -269,7 +270,7 @@ class HsFileDataManagerTest {
                 });
         factory.allReaders.add(reader);
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -314,7 +315,7 @@ class HsFileDataManagerTest {
                 };
         releaseThread.start();
 
-        fileDataManager.registerNewSubpartition(0, subpartitionViewOperation);
+        fileDataManager.registerNewConsumer(0, DEFAULT, 
subpartitionViewOperation);
 
         ioExecutor.trigger();
 
@@ -335,7 +336,8 @@ class HsFileDataManagerTest {
         fileDataManager.release();
         assertThatThrownBy(
                         () -> {
-                            fileDataManager.registerNewSubpartition(0, 
subpartitionViewOperation);
+                            fileDataManager.registerNewConsumer(
+                                    0, DEFAULT, subpartitionViewOperation);
                             ioExecutor.trigger();
                         })
                 .isInstanceOf(IllegalStateException.class)
@@ -358,6 +360,7 @@ class HsFileDataManagerTest {
         HsSubpartitionFileReaderImpl subpartitionFileReader =
                 new HsSubpartitionFileReaderImpl(
                         0,
+                        DEFAULT,
                         dataFileChannel,
                         subpartitionView,
                         new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
@@ -376,7 +379,7 @@ class HsFileDataManagerTest {
                     }
                 };
         factory.allReaders.add(subpartitionFileReader);
-        HsDataView diskDataView = fileDataManager.registerNewSubpartition(0, 
subpartitionView);
+        HsDataView diskDataView = fileDataManager.registerNewConsumer(0, 
DEFAULT, subpartitionView);
         subpartitionView.setDiskDataView(diskDataView);
         TestingHsDataView memoryDataView =
                 TestingHsDataView.builder()
@@ -496,6 +499,7 @@ class HsFileDataManagerTest {
             @Override
             public HsSubpartitionFileReader createFileReader(
                     int subpartitionId,
+                    HsConsumerId consumerId,
                     FileChannel dataFileChannel,
                     HsSubpartitionConsumerInternalOperations operation,
                     HsFileDataIndex dataIndex,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
index e60eff5dc09..cccf9f5d93e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImplTest.java
@@ -501,6 +501,38 @@ class HsSubpartitionFileReaderImplTest {
         checkData(subpartitionFileReader, 1, 2, 3);
     }
 
+    @Test
+    void testMultipleFileReaderOfSingleSubpartition() throws Exception {
+        TestingSubpartitionConsumerInternalOperation viewNotifier1 =
+                new TestingSubpartitionConsumerInternalOperation();
+        TestingSubpartitionConsumerInternalOperation viewNotifier2 =
+                new TestingSubpartitionConsumerInternalOperation();
+
+        HsConsumerId consumer0 = HsConsumerId.newId(null);
+        HsSubpartitionFileReaderImpl consumer1 =
+                createSubpartitionFileReader(0, consumer0, viewNotifier1);
+        HsSubpartitionFileReaderImpl consumer2 =
+                createSubpartitionFileReader(0, HsConsumerId.newId(consumer0), 
viewNotifier2);
+
+        assertThat(consumer1).isNotEqualTo(consumer2);
+
+        // write data to a single subpartition, then read these buffers by two 
consumers
+        // respectively.
+        writeDataToFile(0, 0, 1, 3);
+
+        consumer1.prepareForScheduling();
+        Queue<MemorySegment> memorySegments = createsMemorySegments(3);
+        consumer1.readBuffers(memorySegments, (ignore) -> {});
+        assertThat(memorySegments).isEmpty();
+        checkData(consumer1, 1, 2, 3);
+
+        consumer2.prepareForScheduling();
+        memorySegments = createsMemorySegments(3);
+        consumer2.readBuffers(memorySegments, (ignore) -> {});
+        assertThat(memorySegments).isEmpty();
+        checkData(consumer2, 1, 2, 3);
+    }
+
     private static void checkData(
             HsSubpartitionFileReaderImpl fileReader,
             BufferDecompressor bufferDecompressor,
@@ -530,8 +562,16 @@ class HsSubpartitionFileReaderImplTest {
 
     private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
             int targetChannel, HsSubpartitionConsumerInternalOperations 
operations) {
+        return createSubpartitionFileReader(targetChannel, 
HsConsumerId.DEFAULT, operations);
+    }
+
+    private HsSubpartitionFileReaderImpl createSubpartitionFileReader(
+            int targetChannel,
+            HsConsumerId consumerId,
+            HsSubpartitionConsumerInternalOperations operations) {
         return new HsSubpartitionFileReaderImpl(
                 targetChannel,
+                consumerId,
                 dataFileChannel,
                 operations,
                 diskIndex,

Reply via email to