This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c04985e6ac968af2181ecaa7f0356d54fdd47848
Author: Weijie Guo <[email protected]>
AuthorDate: Wed Oct 26 11:57:22 2022 +0800

    [FLINK-28889] fail fast when multiple consumer is not allowed but register 
many times
---
 .../partition/hybrid/HsResultPartition.java        | 16 +++++++-
 .../partition/hybrid/HsResultPartitionTest.java    | 46 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index 06a15a507c5..26aee9450ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -192,9 +193,11 @@ public class HsResultPartition extends ResultPartition {
 
         HsSubpartitionConsumer subpartitionConsumer =
                 new HsSubpartitionConsumer(availabilityListener);
+        HsConsumerId lastConsumerId = lastConsumerIds[subpartitionId];
+        checkMultipleConsumerIsAllowed(lastConsumerId, 
hybridShuffleConfiguration);
         // assign a unique id for each consumer, now it is guaranteed by the 
value that is one
         // higher than the last consumerId's id field.
-        HsConsumerId consumerId = 
HsConsumerId.newId(lastConsumerIds[subpartitionId]);
+        HsConsumerId consumerId = HsConsumerId.newId(lastConsumerId);
         lastConsumerIds[subpartitionId] = consumerId;
         HsDataView diskDataView =
                 fileDataManager.registerNewConsumer(
@@ -295,4 +298,15 @@ public class HsResultPartition extends ResultPartition {
                 throw new IllegalConfigurationException("Illegal spilling 
strategy.");
         }
     }
+
+    private void checkMultipleConsumerIsAllowed(
+            HsConsumerId lastConsumerId, HybridShuffleConfiguration 
hybridShuffleConfiguration) {
+        if (hybridShuffleConfiguration.getSpillingStrategyType()
+                == SpillingStrategyType.SELECTIVE) {
+            checkState(
+                    lastConsumerId == null,
+                    "Multiple consumer is not allowed for %s spilling strategy 
mode",
+                    hybridShuffleConfiguration.getSpillingStrategyType());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
index 18425d5ea01..296834db22b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java
@@ -67,6 +67,7 @@ import java.util.function.BiConsumer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link HsResultPartition}. */
@@ -381,6 +382,51 @@ class HsResultPartitionTest {
         }
     }
 
+    @Test
+    void testSelectiveSpillingStrategyRegisterMultipleConsumer() throws 
Exception {
+        final int numSubpartitions = 2;
+        BufferPool bufferPool = globalPool.createBufferPool(2, 2);
+        try (HsResultPartition partition =
+                createHsResultPartition(
+                        2,
+                        bufferPool,
+                        HybridShuffleConfiguration.builder(
+                                        numSubpartitions, 
readBufferPool.getNumBuffersPerRequest())
+                                .setSpillingStrategyType(
+                                        
HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE)
+                                .build())) {
+            partition.createSubpartitionView(0, new 
NoOpBufferAvailablityListener());
+            assertThatThrownBy(
+                            () ->
+                                    partition.createSubpartitionView(
+                                            0, new 
NoOpBufferAvailablityListener()))
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("Multiple consumer is not allowed");
+        }
+    }
+
+    @Test
+    void testFullSpillingStrategyRegisterMultipleConsumer() throws Exception {
+        final int numSubpartitions = 2;
+        BufferPool bufferPool = globalPool.createBufferPool(2, 2);
+        try (HsResultPartition partition =
+                createHsResultPartition(
+                        2,
+                        bufferPool,
+                        HybridShuffleConfiguration.builder(
+                                        numSubpartitions, 
readBufferPool.getNumBuffersPerRequest())
+                                .setSpillingStrategyType(
+                                        
HybridShuffleConfiguration.SpillingStrategyType.FULL)
+                                .build())) {
+            partition.createSubpartitionView(0, new 
NoOpBufferAvailablityListener());
+            assertThatNoException()
+                    .isThrownBy(
+                            () ->
+                                    partition.createSubpartitionView(
+                                            0, new 
NoOpBufferAvailablityListener()));
+        }
+    }
+
     private static void recordDataWritten(
             ByteBuffer record,
             Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten,

Reply via email to