This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 033b4f3b9c702d9841e481428acf4bc7ff7a194f Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Dec 2 23:57:19 2020 +0100 [FLINK-19681][checkpointing] Resume consumption when receiving different upstream signals Solves hanging up in 1/12 uc tests. --- .../streaming/runtime/io/AlternatingController.java | 3 ++- .../runtime/io/AlternatingControllerTest.java | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index f943638..90b79c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -92,9 +92,10 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll return maybeTimedOut; } else { - // TODO: add unit test for this alignedController.resumeConsumption(channelInfo); } + } else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint() && activeController == unalignedController) { + alignedController.resumeConsumption(channelInfo); } return Optional.empty(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java index b12a0ad..7ecf8cb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java @@ -68,6 +68,27 @@ import static org.junit.Assert.assertFalse; */ public class AlternatingControllerTest { + @Test + public void testChannelUnblockedAfterDifferentBarriers() throws Exception { + CheckpointedInputGate gate = buildGate(new ValidatingCheckpointHandler(), 3); + long barrierId = 1L; + long ts = System.currentTimeMillis(); + long timeout = 10; + + send(barrier(barrierId, ts, unaligned(getDefault())), 0, gate); + + TestInputChannel acChannel = (TestInputChannel) gate.getChannel(1); + acChannel.setBlocked(true); + send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), Integer.MAX_VALUE)), acChannel.getChannelIndex(), gate); + assertFalse(acChannel.isBlocked()); + + Thread.sleep(timeout); + TestInputChannel acChannelWithTimeout = (TestInputChannel) gate.getChannel(2); + acChannelWithTimeout.setBlocked(true); + send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), timeout)), acChannelWithTimeout.getChannelIndex(), gate); + assertFalse(acChannelWithTimeout.isBlocked()); + } + /** * Upon subsuming (or canceling) a checkpoint, channels should be notified regardless of whether UC controller is * currently being used or not. Otherwise, channels may not capture in-flight buffers.
