This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a2f3a15903ca365c14368b34b30a6234a51aa5e
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Jul 28 13:59:55 2022 +0800

    [FLINK-27908] ResultPartitionFactory also supports HYBRID type.
    
    This closes #20371
---
 .../network/partition/ResultPartitionFactory.java  | 22 ++++++++++++++++++++++
 .../io/network/partition/ResultPartitionType.java  |  2 +-
 .../partition/ResultPartitionFactoryTest.java      | 16 ++++++++++++++++
 3 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index e0c6bba9a05..6d47bb3c047 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
 import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -213,6 +215,26 @@ public class ResultPartitionFactory {
 
                 partition = blockingPartition;
             }
+        } else if (type == ResultPartitionType.HYBRID) {
+            partition =
+                    new HsResultPartition(
+                            taskNameWithSubtaskAndId,
+                            partitionIndex,
+                            id,
+                            type,
+                            subpartitions.length,
+                            maxParallelism,
+                            batchShuffleReadBufferPool,
+                            batchShuffleReadIOExecutor,
+                            partitionManager,
+                            channelManager.createChannel().getPath(),
+                            networkBufferSize,
+                            HybridShuffleConfiguration.builder(
+                                            numberOfSubpartitions,
+                                            
batchShuffleReadBufferPool.getNumBuffersPerRequest())
+                                    .build(),
+                            bufferCompressor,
+                            bufferPoolFactory);
         } else {
             throw new IllegalArgumentException("Unrecognized 
ResultPartitionType: " + type);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 0cb6eb61341..ee341f50535 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -89,7 +89,7 @@ public enum ResultPartitionType {
      *
      * <p>Hybrid partitions can be consumed any time, whether fully produced 
or not.
      */
-    HYBRID(true, false, ConsumingConstraint.CAN_BE_PIPELINED, 
ReleaseBy.SCHEDULER);
+    HYBRID(false, false, ConsumingConstraint.CAN_BE_PIPELINED, 
ReleaseBy.SCHEDULER);
 
     /** Does this partition use a limited number of (network) buffers? */
     private final boolean isBounded;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index f0b31679f37..cd22abb953e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
 import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
@@ -82,6 +83,12 @@ public class ResultPartitionFactoryTest extends TestLogger {
         assertTrue(resultPartition instanceof SortMergeResultPartition);
     }
 
+    @Test
+    public void testHybridResultPartitionCreated() {
+        ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.HYBRID);
+        assertTrue(resultPartition instanceof HsResultPartition);
+    }
+
     @Test
     public void testNoReleaseOnConsumptionForBoundedBlockingPartition() {
         final ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.BLOCKING);
@@ -101,6 +108,15 @@ public class ResultPartitionFactoryTest extends TestLogger 
{
         assertFalse(resultPartition.isReleased());
     }
 
+    @Test
+    public void testNoReleaseOnConsumptionForHybridPartition() {
+        final ResultPartition resultPartition = 
createResultPartition(ResultPartitionType.HYBRID);
+
+        resultPartition.onConsumedSubpartition(0);
+
+        assertFalse(resultPartition.isReleased());
+    }
+
     private static ResultPartition createResultPartition(ResultPartitionType 
partitionType) {
         return createResultPartition(partitionType, Integer.MAX_VALUE);
     }

Reply via email to