This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c04985e6ac968af2181ecaa7f0356d54fdd47848 Author: Weijie Guo <[email protected]> AuthorDate: Wed Oct 26 11:57:22 2022 +0800 [FLINK-28889] fail fast when multiple consumer is not allowed but register many times --- .../partition/hybrid/HsResultPartition.java | 16 +++++++- .../partition/hybrid/HsResultPartitionTest.java | 46 ++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index 06a15a507c5..26aee9450ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration.SpillingStrategyType; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.function.SupplierWithException; @@ -192,9 +193,11 @@ public class HsResultPartition extends ResultPartition { HsSubpartitionConsumer subpartitionConsumer = new HsSubpartitionConsumer(availabilityListener); + HsConsumerId lastConsumerId = lastConsumerIds[subpartitionId]; + checkMultipleConsumerIsAllowed(lastConsumerId, hybridShuffleConfiguration); // assign a unique id for each consumer, now it is guaranteed by the value that is one // higher than the last consumerId's id field. - HsConsumerId consumerId = HsConsumerId.newId(lastConsumerIds[subpartitionId]); + HsConsumerId consumerId = HsConsumerId.newId(lastConsumerId); lastConsumerIds[subpartitionId] = consumerId; HsDataView diskDataView = fileDataManager.registerNewConsumer( @@ -295,4 +298,15 @@ public class HsResultPartition extends ResultPartition { throw new IllegalConfigurationException("Illegal spilling strategy."); } } + + private void checkMultipleConsumerIsAllowed( + HsConsumerId lastConsumerId, HybridShuffleConfiguration hybridShuffleConfiguration) { + if (hybridShuffleConfiguration.getSpillingStrategyType() + == SpillingStrategyType.SELECTIVE) { + checkState( + lastConsumerId == null, + "Multiple consumer is not allowed for %s spilling strategy mode", + hybridShuffleConfiguration.getSpillingStrategyType()); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java index 18425d5ea01..296834db22b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionTest.java @@ -67,6 +67,7 @@ import java.util.function.BiConsumer; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link HsResultPartition}. */ @@ -381,6 +382,51 @@ class HsResultPartitionTest { } } + @Test + void testSelectiveSpillingStrategyRegisterMultipleConsumer() throws Exception { + final int numSubpartitions = 2; + BufferPool bufferPool = globalPool.createBufferPool(2, 2); + try (HsResultPartition partition = + createHsResultPartition( + 2, + bufferPool, + HybridShuffleConfiguration.builder( + numSubpartitions, readBufferPool.getNumBuffersPerRequest()) + .setSpillingStrategyType( + HybridShuffleConfiguration.SpillingStrategyType.SELECTIVE) + .build())) { + partition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + assertThatThrownBy( + () -> + partition.createSubpartitionView( + 0, new NoOpBufferAvailablityListener())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Multiple consumer is not allowed"); + } + } + + @Test + void testFullSpillingStrategyRegisterMultipleConsumer() throws Exception { + final int numSubpartitions = 2; + BufferPool bufferPool = globalPool.createBufferPool(2, 2); + try (HsResultPartition partition = + createHsResultPartition( + 2, + bufferPool, + HybridShuffleConfiguration.builder( + numSubpartitions, readBufferPool.getNumBuffersPerRequest()) + .setSpillingStrategyType( + HybridShuffleConfiguration.SpillingStrategyType.FULL) + .build())) { + partition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + assertThatNoException() + .isThrownBy( + () -> + partition.createSubpartitionView( + 0, new NoOpBufferAvailablityListener())); + } + } + private static void recordDataWritten( ByteBuffer record, Queue<Tuple2<ByteBuffer, Buffer.DataType>>[] dataWritten,
