This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302 Author: Zhijiang <[email protected]> AuthorDate: Thu May 16 16:53:51 2019 +0800 [hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition It is necessary for flip1 to make sure the PartitionNotFoundException would be thrown by LocalInputChannel#requestSubpartition if the partition was not registered in ResultPartitionManager before. So a new unit test is added to cover this case. --- .../partition/consumer/SingleInputGate.java | 5 ++ .../partition/consumer/LocalInputChannelTest.java | 72 ++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 6c23698..63504bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate { } } + @VisibleForTesting + Timer getRetriggerLocalRequestTimer() { + return retriggerLocalRequestTimer; + } + @Override public void close() throws IOException { boolean released = false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 505f792..a3bc696 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; +import org.hamcrest.Matchers; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -60,6 +61,8 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -247,6 +250,75 @@ public class LocalInputChannelTest { } /** + * Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException} + * if the result partition was not registered in {@link ResultPartitionManager} and no backoff. + */ + @Test + public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager()); + + try { + localChannel.requestSubpartition(0); + + fail("Should throw a PartitionNotFoundException."); + } catch (PartitionNotFoundException notFound) { + assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId())); + } + } + + /** + * Tests that {@link SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is triggered + * after {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException} + * within backoff. + */ + @Test + public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception { + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel( + inputGate, new ResultPartitionManager(), 1, 1); + + inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel); + localChannel.requestSubpartition(0); + + // The timer should be initialized at the first time of retriggering partition request. + assertNotNull(inputGate.getRetriggerLocalRequestTimer()); + } + + /** + * Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw + * {@link PartitionNotFoundException} which is set onto the input channel then. + */ + @Test + public void testChannelErrorWhileRetriggeringRequest() { + final SingleInputGate inputGate = createSingleInputGate(1); + final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager()); + + final Timer timer = new Timer(true) { + @Override + public void schedule(TimerTask task, long delay) { + task.run(); + + try { + localChannel.checkError(); + + fail("Should throw a PartitionNotFoundException."); + } catch (PartitionNotFoundException notFound) { + assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId())); + } catch (IOException ex) { + fail("Should throw a PartitionNotFoundException."); + } + } + }; + + try { + localChannel.retriggerSubpartitionRequest(timer, 0); + } finally { + timer.cancel(); + } + } + + /** * Verifies that concurrent release via the SingleInputGate and re-triggering * of a partition request works smoothly. *
