This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4a2f3a15903ca365c14368b34b30a6234a51aa5e Author: Weijie Guo <[email protected]> AuthorDate: Thu Jul 28 13:59:55 2022 +0800 [FLINK-27908] ResultPartitionFactory also supports HYBRID type. This closes #20371 --- .../network/partition/ResultPartitionFactory.java | 22 ++++++++++++++++++++++ .../io/network/partition/ResultPartitionType.java | 2 +- .../partition/ResultPartitionFactoryTest.java | 16 ++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index e0c6bba9a05..6d47bb3c047 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition; +import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration; import org.apache.flink.runtime.shuffle.NettyShuffleUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; @@ -213,6 +215,26 @@ public class ResultPartitionFactory { partition = blockingPartition; } + } else if (type == ResultPartitionType.HYBRID) { + partition = + new HsResultPartition( + taskNameWithSubtaskAndId, + partitionIndex, + id, + type, + subpartitions.length, + maxParallelism, + batchShuffleReadBufferPool, + batchShuffleReadIOExecutor, + partitionManager, + channelManager.createChannel().getPath(), + networkBufferSize, + HybridShuffleConfiguration.builder( + numberOfSubpartitions, + batchShuffleReadBufferPool.getNumBuffersPerRequest()) + .build(), + bufferCompressor, + bufferPoolFactory); } else { throw new IllegalArgumentException("Unrecognized ResultPartitionType: " + type); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java index 0cb6eb61341..ee341f50535 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java @@ -89,7 +89,7 @@ public enum ResultPartitionType { * * <p>Hybrid partitions can be consumed any time, whether fully produced or not. */ - HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER); + HYBRID(false, false, ConsumingConstraint.CAN_BE_PIPELINED, ReleaseBy.SCHEDULER); /** Does this partition use a limited number of (network) buffers? */ private final boolean isBounded; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index f0b31679f37..cd22abb953e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition; import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; @@ -82,6 +83,12 @@ public class ResultPartitionFactoryTest extends TestLogger { assertTrue(resultPartition instanceof SortMergeResultPartition); } + @Test + public void testHybridResultPartitionCreated() { + ResultPartition resultPartition = createResultPartition(ResultPartitionType.HYBRID); + assertTrue(resultPartition instanceof HsResultPartition); + } + @Test public void testNoReleaseOnConsumptionForBoundedBlockingPartition() { final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING); @@ -101,6 +108,15 @@ public class ResultPartitionFactoryTest extends TestLogger { assertFalse(resultPartition.isReleased()); } + @Test + public void testNoReleaseOnConsumptionForHybridPartition() { + final ResultPartition resultPartition = createResultPartition(ResultPartitionType.HYBRID); + + resultPartition.onConsumedSubpartition(0); + + assertFalse(resultPartition.isReleased()); + } + private static ResultPartition createResultPartition(ResultPartitionType partitionType) { return createResultPartition(partitionType, Integer.MAX_VALUE); }
