This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07ae4308459304d24deacf09a31b5c8922e939e6 Author: Piotr Nowojski <[email protected]> AuthorDate: Tue May 7 14:04:49 2019 +0200 [hotfix][test] Introduce InputGateTestBase and deduplicate test code --- .../partition/consumer/InputGateTestBase.java | 66 ++++++++++++++++++++++ .../partition/consumer/SingleInputGateTest.java | 33 +---------- .../partition/consumer/UnionInputGateTest.java | 6 +- 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java new file mode 100644 index 0000000..2214887 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.consumer; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; +import static org.junit.Assert.assertEquals; + +/** + * Test base for {@link InputGate}. + */ +@RunWith(Parameterized.class) +public abstract class InputGateTestBase { + + @Parameter + public boolean enableCreditBasedFlowControl; + + @Parameters(name = "Credit-based = {0}") + public static List<Boolean> parameters() { + return Arrays.asList(Boolean.TRUE, Boolean.FALSE); + } + + protected SingleInputGate createInputGate() { + return createInputGate(2); + } + + protected SingleInputGate createInputGate(int numberOfInputChannels) { + return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED); + } + + protected SingleInputGate createInputGate( + int numberOfInputChannels, ResultPartitionType partitionType) { + SingleInputGate inputGate = createSingleInputGate( + numberOfInputChannels, + partitionType, + enableCreditBasedFlowControl); + + assertEquals(partitionType, inputGate.getConsumedPartitionType()); + return inputGate; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index d82d571..6a5415a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -50,18 +50,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -80,17 +75,7 @@ import static org.mockito.Mockito.when; /** * Tests for {@link SingleInputGate}. */ -@RunWith(Parameterized.class) -public class SingleInputGateTest { - - @Parameterized.Parameter - public boolean enableCreditBasedFlowControl; - - @Parameterized.Parameters(name = "Credit-based = {0}") - public static List<Boolean> parameters() { - return Arrays.asList(Boolean.TRUE, Boolean.FALSE); - } - +public class SingleInputGateTest extends InputGateTestBase { /** * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return * value after receiving all end-of-partition events. @@ -546,22 +531,6 @@ public class SingleInputGateTest { // --------------------------------------------------------------------------------------------- - private SingleInputGate createInputGate() { - return createInputGate(2); - } - - private SingleInputGate createInputGate(int numberOfInputChannels) { - return createInputGate(numberOfInputChannels, ResultPartitionType.PIPELINED); - } - - private SingleInputGate createInputGate(int numberOfInputChannels, ResultPartitionType partitionType) { - SingleInputGate inputGate = createSingleInputGate(numberOfInputChannels, partitionType, enableCreditBasedFlowControl); - - assertEquals(partitionType, inputGate.getConsumedPartitionType()); - - return inputGate; - } - private void addUnknownInputChannel( NetworkEnvironment network, SingleInputGate inputGate, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java index 082ccec..448071f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java @@ -29,7 +29,7 @@ import static org.junit.Assert.assertTrue; /** * Tests for {@link UnionInputGate}. */ -public class UnionInputGateTest { +public class UnionInputGateTest extends InputGateTestBase { /** * Tests basic correctness of buffer-or-event interleaving and correct <code>null</code> return @@ -41,8 +41,8 @@ public class UnionInputGateTest { @Test(timeout = 120 * 1000) public void testBasicGetNextLogic() throws Exception { // Setup - final SingleInputGate ig1 = createSingleInputGate(3); - final SingleInputGate ig2 = createSingleInputGate(5); + final SingleInputGate ig1 = createInputGate(3); + final SingleInputGate ig2 = createInputGate(5); final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
