This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 033b4f3b9c702d9841e481428acf4bc7ff7a194f
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Dec 2 23:57:19 2020 +0100

    [FLINK-19681][checkpointing] Resume consumption when receiving different 
upstream signals
    
    Solves hanging up in 1/12 uc tests.
---
 .../streaming/runtime/io/AlternatingController.java |  3 ++-
 .../runtime/io/AlternatingControllerTest.java       | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index f943638..90b79c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -92,9 +92,10 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
                                return maybeTimedOut;
                        }
                        else {
-                               // TODO: add unit test for this
                                
alignedController.resumeConsumption(channelInfo);
                        }
+               } else if 
(!barrier.getCheckpointOptions().isUnalignedCheckpoint() && activeController == 
unalignedController) {
+                       alignedController.resumeConsumption(channelInfo);
                }
                return Optional.empty();
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index b12a0ad..7ecf8cb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -68,6 +68,27 @@ import static org.junit.Assert.assertFalse;
  */
 public class AlternatingControllerTest {
 
+       @Test
+       public void testChannelUnblockedAfterDifferentBarriers() throws 
Exception {
+               CheckpointedInputGate gate = buildGate(new 
ValidatingCheckpointHandler(), 3);
+               long barrierId = 1L;
+               long ts = System.currentTimeMillis();
+               long timeout = 10;
+
+               send(barrier(barrierId, ts, unaligned(getDefault())), 0, gate);
+
+               TestInputChannel acChannel = (TestInputChannel) 
gate.getChannel(1);
+               acChannel.setBlocked(true);
+               send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE)), acChannel.getChannelIndex(), gate);
+               assertFalse(acChannel.isBlocked());
+
+               Thread.sleep(timeout);
+               TestInputChannel acChannelWithTimeout = (TestInputChannel) 
gate.getChannel(2);
+               acChannelWithTimeout.setBlocked(true);
+               send(barrier(barrierId, ts, alignedWithTimeout(getDefault(), 
timeout)), acChannelWithTimeout.getChannelIndex(), gate);
+               assertFalse(acChannelWithTimeout.isBlocked());
+       }
+
        /**
         * Upon subsuming (or canceling) a checkpoint, channels should be 
notified regardless of whether UC controller is
         * currently being used or not. Otherwise, channels may not capture 
in-flight buffers.

Reply via email to