This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fe12c9761f6d85073bd5d44b107b18117252d91
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Wed Jul 31 16:15:30 2019 +0300

    [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based 
on ResultPartitionType.isBlocking
---
 .../network/partition/ResultPartitionFactory.java  | 27 +++++++++-------------
 .../partition/ResultPartitionFactoryTest.java      | 14 +++++++++++
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 0656e6e..4933a4e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -133,22 +133,17 @@ public class ResultPartitionFactory {
                        BoundedBlockingSubpartitionType 
blockingSubpartitionType,
                        ResultSubpartition[] subpartitions) {
                // Create the subpartitions.
-               switch (type) {
-                       case BLOCKING:
-                       case BLOCKING_PERSISTENT:
-                               
initializeBoundedBlockingPartitions(subpartitions, partition, 
blockingSubpartitionType, networkBufferSize, channelManager);
-                               break;
-
-                       case PIPELINED:
-                       case PIPELINED_BOUNDED:
-                               for (int i = 0; i < subpartitions.length; i++) {
-                                       subpartitions[i] = new 
PipelinedSubpartition(i, partition);
-                               }
-
-                               break;
-
-                       default:
-                               throw new IllegalArgumentException("Unsupported 
result partition type.");
+               if (type.isBlocking()) {
+                       initializeBoundedBlockingPartitions(
+                               subpartitions,
+                               partition,
+                               blockingSubpartitionType,
+                               networkBufferSize,
+                               channelManager);
+               } else {
+                       for (int i = 0; i < subpartitions.length; i++) {
+                               subpartitions[i] = new PipelinedSubpartition(i, 
partition);
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 1c8591f..653c7f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -32,6 +32,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,6 +60,18 @@ public class ResultPartitionFactoryTest extends TestLogger {
        }
 
        @Test
+       public void testBoundedBlockingSubpartitionsCreated() {
+               final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.BLOCKING);
+               Arrays.stream(resultPartition.subpartitions).forEach(sp -> 
assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
+       }
+
+       @Test
+       public void testPipelinedSubpartitionsCreated() {
+               final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.PIPELINED);
+               Arrays.stream(resultPartition.subpartitions).forEach(sp -> 
assertThat(sp, instanceOf(PipelinedSubpartition.class)));
+       }
+
+       @Test
        public void testConsumptionOnReleaseForced() {
                final ResultPartition resultPartition = 
createResultPartition(true, ResultPartitionType.BLOCKING);
                assertThat(resultPartition, 
instanceOf(ReleaseOnConsumptionResultPartition.class));

Reply via email to