This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7fe12c9761f6d85073bd5d44b107b18117252d91 Author: Andrey Zagrebin <[email protected]> AuthorDate: Wed Jul 31 16:15:30 2019 +0300 [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based on ResultPartitionType.isBlocking --- .../network/partition/ResultPartitionFactory.java | 27 +++++++++------------- .../partition/ResultPartitionFactoryTest.java | 14 +++++++++++ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 0656e6e..4933a4e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -133,22 +133,17 @@ public class ResultPartitionFactory { BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) { // Create the subpartitions. - switch (type) { - case BLOCKING: - case BLOCKING_PERSISTENT: - initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager); - break; - - case PIPELINED: - case PIPELINED_BOUNDED: - for (int i = 0; i < subpartitions.length; i++) { - subpartitions[i] = new PipelinedSubpartition(i, partition); - } - - break; - - default: - throw new IllegalArgumentException("Unsupported result partition type."); + if (type.isBlocking()) { + initializeBoundedBlockingPartitions( + subpartitions, + partition, + blockingSubpartitionType, + networkBufferSize, + channelManager); + } else { + for (int i = 0; i < subpartitions.length; i++) { + subpartitions[i] = new PipelinedSubpartition(i, partition); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index 1c8591f..653c7f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -32,6 +32,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.Arrays; + import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; @@ -58,6 +60,18 @@ public class ResultPartitionFactoryTest extends TestLogger { } @Test + public void testBoundedBlockingSubpartitionsCreated() { + final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.BLOCKING); + Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class))); + } + + @Test + public void testPipelinedSubpartitionsCreated() { + final ResultPartition resultPartition = createResultPartition(false, ResultPartitionType.PIPELINED); + Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class))); + } + + @Test public void testConsumptionOnReleaseForced() { final ResultPartition resultPartition = createResultPartition(true, ResultPartitionType.BLOCKING); assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
