This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c5c46abbcd8662f7162f934b16209ef3a60585a9
Author: Piotr Nowojski <[email protected]>
AuthorDate: Sun Oct 25 18:04:41 2020 +0100

    [FLINK-19681][checkpointing] Timeout aligned checkpoints based on 
checkpointStartDelay
---
 .../runtime/checkpoint/CheckpointOptions.java      |  11 +
 .../partition/BufferWritingResultPartition.java    |   3 +
 .../io/network/partition/PrioritizedDeque.java     |  22 ++
 .../partition/consumer/ChannelStatePersister.java  |  11 +-
 .../partition/consumer/CheckpointableInput.java    |   2 +
 .../partition/consumer/IndexedInputGate.java       |   7 +
 .../network/partition/consumer/InputChannel.java   |   3 +
 .../partition/consumer/RemoteInputChannel.java     |  47 ++++
 .../io/network/partition/PrioritizedDequeTest.java |  27 ++
 .../partition/consumer/RemoteInputChannelTest.java |  41 +++
 .../streaming/runtime/io/AlignedController.java    |  54 +++-
 .../runtime/io/AlternatingController.java          | 125 ++++++++-
 .../io/CheckpointBarrierBehaviourController.java   |  12 +-
 .../runtime/io/SingleCheckpointBarrierHandler.java |  70 +++--
 .../runtime/io/StreamTaskSourceInput.java          |   5 +
 .../streaming/runtime/io/UnalignedController.java  |  22 +-
 .../runtime/tasks/mailbox/MailboxProcessor.java    |   5 +
 .../runtime/io/AlignedControllerTest.java          |  29 ++
 .../runtime/io/AlternatingControllerTest.java      | 306 ++++++++++++++++++++-
 .../runtime/io/ValidatingCheckpointHandler.java    |   8 +-
 20 files changed, 754 insertions(+), 56 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index 126ba0b..b092a92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Options for performing the checkpoint.
@@ -188,4 +189,14 @@ public class CheckpointOptions implements Serializable {
                        isUnalignedCheckpoint,
                        alignmentTimeout);
        }
+
+       public CheckpointOptions toTimeouted() {
+               checkState(checkpointType == CheckpointType.CHECKPOINT);
+               return create(
+                       checkpointType,
+                       targetLocation,
+                       isExactlyOnceMode,
+                       true,
+                       0);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 2ba000a..415ed19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -235,6 +235,9 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
        private BufferBuilder appendUnicastDataForNewRecord(
                        final ByteBuffer record,
                        final int targetSubpartition) throws IOException {
+               if (targetSubpartition < 0 || targetSubpartition > 
unicastBufferBuilders.length) {
+                       throw new 
ArrayIndexOutOfBoundsException(targetSubpartition);
+               }
                BufferBuilder buffer = 
unicastBufferBuilders[targetSubpartition];
 
                if (buffer == null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
index dabb4a3..77108d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PrioritizedDeque.java
@@ -26,7 +26,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -142,6 +144,26 @@ public final class PrioritizedDeque<T> implements 
Iterable<T> {
        }
 
        /**
+        * Find first element matching the {@link Predicate}, remove it from 
the {@link PrioritizedDeque}
+        * and return it.
+        * @return removed element
+        */
+       public T getAndRemove(Predicate<T> preCondition) {
+               Iterator<T> iterator = deque.iterator();
+               for (int i = 0; i < deque.size(); i++) {
+                       T next = iterator.next();
+                       if (preCondition.test(next)) {
+                               if (i < numPriorityElements) {
+                                       numPriorityElements--;
+                               }
+                               iterator.remove();
+                               return next;
+                       }
+               }
+               throw new NoSuchElementException();
+       }
+
+       /**
         * Polls the first priority element or non-priority element if the 
former does not exist.
         *
         * @return the first element or null.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
index 416c956..924e30f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.CloseableIterator;
@@ -47,9 +48,7 @@ final class ChannelStatePersister {
 
        private long lastSeenBarrier = -1L;
 
-       /**
-        * Writer must be initialized before usage. {@link 
#startPersisting(long, List)} enforces this invariant.
-        */
+       /** Writer must be initialized before usage. {@link 
#startPersisting(long, List)} enforces this invariant. */
        private final ChannelStateWriter channelStateWriter;
 
        ChannelStatePersister(ChannelStateWriter channelStateWriter, 
InputChannelInfo channelInfo) {
@@ -97,6 +96,12 @@ final class ChannelStatePersister {
                                return Optional.of(lastSeenBarrier);
                        }
                }
+               else if (priorityEvent instanceof EventAnnouncement) {
+                       EventAnnouncement announcement = (EventAnnouncement) 
priorityEvent;
+                       if (announcement.getAnnouncedEvent() instanceof 
CheckpointBarrier) {
+                               return Optional.of(((CheckpointBarrier) 
announcement.getAnnouncedEvent()).getId());
+                       }
+               }
                return Optional.empty();
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
index 2316a90..75a2ba6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java
@@ -44,4 +44,6 @@ public interface CheckpointableInput {
        void checkpointStopped(long cancelledCheckpointId);
 
        int getInputGateIndex();
+
+       void convertToPriorityEvent(int channelIndex, int sequenceNumber) 
throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index 3b8ecb2..6566fd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -21,6 +21,8 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
+import java.io.IOException;
+
 /**
  * An {@link InputGate} with a specific index.
  */
@@ -53,4 +55,9 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
        public void blockConsumption(InputChannelInfo channelInfo) {
                // Unused. Network stack is blocking consumption automatically 
by revoking credits.
        }
+
+       @Override
+       public void convertToPriorityEvent(int channelIndex, int 
sequenceNumber) throws IOException {
+               getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 792392b9..e1909b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -183,6 +183,9 @@ public abstract class InputChannel {
        public void checkpointStopped(long checkpointId) {
        }
 
+       public void convertToPriorityEvent(int sequenceNumber) throws 
IOException {
+       }
+
        // 
------------------------------------------------------------------------
        // Task events
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 1992cdb..27fa58f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -494,6 +494,7 @@ public class RemoteInputChannel extends InputChannel {
 
        private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws 
IOException {
                checkState(!sequenceBuffer.buffer.isBuffer(), "Only a 
CheckpointBarrier can be announced but found %s", sequenceBuffer.buffer);
+               checkAnnouncedOnlyOnce(sequenceBuffer);
                AbstractEvent event = EventSerializer.fromBuffer(
                                sequenceBuffer.buffer,
                                getClass().getClassLoader());
@@ -504,6 +505,20 @@ public class RemoteInputChannel extends InputChannel {
                                sequenceBuffer.sequenceNumber);
        }
 
+       private void checkAnnouncedOnlyOnce(SequenceBuffer sequenceBuffer) {
+               Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
+               int count = 0;
+               while (iterator.hasNext()) {
+                       if (iterator.next().sequenceNumber == 
sequenceBuffer.sequenceNumber) {
+                               count++;
+                       }
+               }
+               checkState(
+                       count == 1,
+                       "Before enqueuing the announcement there should be 
exactly single occurrence of the buffer, but found [%d]",
+                       count);
+       }
+
        /**
         * Spills all queued buffers on checkpoint start. If barrier has 
already been received (and reordered), spill only
         * the overtaken buffers.
@@ -533,6 +548,29 @@ public class RemoteInputChannel extends InputChannel {
                }
        }
 
+       @Override
+       public void convertToPriorityEvent(int sequenceNumber) throws 
IOException {
+               boolean firstPriorityEvent;
+               synchronized (receivedBuffers) {
+                       checkState(!channelStatePersister.hasBarrierReceived());
+                       int numPriorityElementsBeforeRemoval = 
receivedBuffers.getNumPriorityElements();
+                       SequenceBuffer toPrioritize = 
receivedBuffers.getAndRemove(
+                               sequenceBuffer -> sequenceBuffer.sequenceNumber 
== sequenceNumber);
+                       checkState(lastBarrierSequenceNumber == sequenceNumber);
+                       checkState(!toPrioritize.buffer.isBuffer());
+                       checkState(
+                               numPriorityElementsBeforeRemoval == 
receivedBuffers.getNumPriorityElements(),
+                               "Attempted to convertToPriorityEvent an event 
[%s] that has already been prioritized [%s]",
+                               toPrioritize,
+                               numPriorityElementsBeforeRemoval);
+                       firstPriorityEvent = addPriorityBuffer(toPrioritize);   
// note that only position of the element is changed
+                                                                               
                                                        // converting the event 
itself would require switching the controller sooner
+               }
+               if (firstPriorityEvent) {
+                       notifyPriorityEvent(sequenceNumber);
+               }
+       }
+
        /**
         * Returns a list of buffers, checking the first n non-priority 
buffers, and skipping all events.
         */
@@ -653,5 +691,14 @@ public class RemoteInputChannel extends InputChannel {
                        this.buffer = buffer;
                        this.sequenceNumber = sequenceNumber;
                }
+
+               @Override
+               public String toString() {
+                       return String.format(
+                               "SequenceBuffer(isEvent = %s, dataType = %s, 
sequenceNumber = %s)",
+                               !buffer.isBuffer(),
+                               buffer.getDataType(),
+                               sequenceNumber);
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
index 8932c86..5cc8a3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PrioritizedDequeTest.java
@@ -19,7 +19,11 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.junit.Test;
 
+import java.util.NoSuchElementException;
+
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests PrioritizedDeque.
@@ -53,4 +57,27 @@ public class PrioritizedDequeTest {
 
                assertArrayEquals(new Integer[] { 3, 0, 1, 2 }, 
deque.asUnmodifiableCollection().toArray(new Integer[0]));
        }
+
+       @Test
+       public void testGetAndRemove() {
+               final PrioritizedDeque<Integer> deque = new 
PrioritizedDeque<>();
+
+               deque.add(0);
+               deque.add(1);
+               deque.add(2);
+               deque.add(1);
+               deque.add(3);
+
+               assertEquals(1, deque.getAndRemove(v -> v == 1).intValue());
+               assertArrayEquals(new Integer[] { 0, 2, 1, 3 }, 
deque.asUnmodifiableCollection().toArray(new Integer[0]));
+               assertEquals(1, deque.getAndRemove(v -> v == 1).intValue());
+               assertArrayEquals(new Integer[] { 0, 2, 3 }, 
deque.asUnmodifiableCollection().toArray(new Integer[0]));
+               try {
+                       int removed = deque.getAndRemove(v -> v == 1);
+                       fail(String.format("This should not happen. Item [%s] 
was removed, but it shouldn't be found", removed));
+               }
+               catch (NoSuchElementException ex) {
+                       // expected
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index c74cf4e..2954a3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -1232,6 +1232,47 @@ public class RemoteInputChannelTest {
                assertEquals(3, 
channel.getNextBuffer().get().getSequenceNumber());
        }
 
+       @Test
+       public void testGetInflightBuffersBeforeProcessingAnnouncement() throws 
Exception {
+               int bufferSize = 1;
+               int sequenceNumber = 0;
+               final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBarrier(channel, sequenceNumber++, 10);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               assertInflightBufferSizes(channel, 1, 2);
+       }
+
+       @Test
+       public void testGetInflightBuffersAfterProcessingAnnouncement() throws 
Exception {
+               int bufferSize = 1;
+               int sequenceNumber = 0;
+               final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBarrier(channel, sequenceNumber++, 10);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               assertGetNextBufferSequenceNumbers(channel, 2);
+               assertInflightBufferSizes(channel, 1, 2);
+       }
+
+       @Test
+       public void 
testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception {
+               int bufferSize = 1;
+               int sequenceNumber = 0;
+               final RemoteInputChannel channel = 
buildInputGateAndGetChannel(sequenceNumber);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBarrier(channel, sequenceNumber++, 10);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               sendBuffer(channel, sequenceNumber++, bufferSize++);
+               assertGetNextBufferSequenceNumbers(channel, 2, 0);
+               assertInflightBufferSizes(channel, 2);
+       }
+
        private void sendBarrier(RemoteInputChannel channel, int 
sequenceNumber, int alignmentTimeout) throws IOException {
                CheckpointOptions checkpointOptions = CheckpointOptions.create(
                        CHECKPOINT,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
index f3999a4..a2ef674 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlignedController.java
@@ -26,7 +26,10 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -39,13 +42,35 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 public class AlignedController implements CheckpointBarrierBehaviourController 
{
        private final CheckpointableInput[] inputs;
 
+       /**
+        * {@link #blockedChannels} are the ones for which we have already 
processed {@link CheckpointBarrier}
+        * (via {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)}. 
{@link #sequenceNumberInAnnouncedChannels}
+        * on the other hand, are the ones that we have processed {@link 
#barrierAnnouncement(InputChannelInfo, CheckpointBarrier, int)}
+        * but not yet {@link #barrierReceived(InputChannelInfo, 
CheckpointBarrier)}.
+        */
        private final Map<InputChannelInfo, Boolean> blockedChannels;
+       private final Map<InputChannelInfo, Integer> 
sequenceNumberInAnnouncedChannels;
 
        public AlignedController(CheckpointableInput... inputs) {
                this.inputs = inputs;
                blockedChannels = Arrays.stream(inputs)
                        .flatMap(gate -> gate.getChannelInfos().stream())
                        .collect(Collectors.toMap(Function.identity(), info -> 
false));
+               sequenceNumberInAnnouncedChannels = new HashMap<>();
+       }
+
+       @Override
+       public void barrierAnnouncement(
+                       InputChannelInfo channelInfo,
+                       CheckpointBarrier announcedBarrier,
+                       int sequenceNumber) {
+               Integer previousValue = 
sequenceNumberInAnnouncedChannels.put(channelInfo, sequenceNumber);
+               checkState(
+                       previousValue == null,
+                       "Stream corrupt: Repeated barrierAnnouncement [%s] 
overwriting [%s] for the same checkpoint on input %s",
+                       announcedBarrier,
+                       sequenceNumber,
+                       channelInfo);
        }
 
        @Override
@@ -53,27 +78,29 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
        }
 
        @Override
-       public void barrierReceived(
+       public Optional<CheckpointBarrier> barrierReceived(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) {
                checkState(!blockedChannels.put(channelInfo, true), "Stream 
corrupt: Repeated barrier for same checkpoint on input " + channelInfo);
+               sequenceNumberInAnnouncedChannels.remove(channelInfo);
                CheckpointableInput input = inputs[channelInfo.getGateIdx()];
                input.blockConsumption(channelInfo);
+               return Optional.empty();
        }
 
        @Override
-       public boolean preProcessFirstBarrier(
+       public Optional<CheckpointBarrier> preProcessFirstBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) {
-               return false;
+               return Optional.empty();
        }
 
        @Override
-       public boolean postProcessLastBarrier(
+       public Optional<CheckpointBarrier> postProcessLastBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException {
                resumeConsumption();
-               return true;
+               return Optional.of(barrier);
        }
 
        @Override
@@ -90,16 +117,29 @@ public class AlignedController implements 
CheckpointBarrierBehaviourController {
                resumeConsumption(channelInfo);
        }
 
-       private void resumeConsumption() throws IOException {
+       public Collection<InputChannelInfo> getBlockedChannels() {
+               return blockedChannels.entrySet()
+                       .stream()
+                       .filter(entry -> entry.getValue())
+                       .map(entry -> entry.getKey())
+                       .collect(Collectors.toSet());
+       }
+
+       public Map<InputChannelInfo, Integer> 
getSequenceNumberInAnnouncedChannels() {
+               return new HashMap<>(sequenceNumberInAnnouncedChannels);
+       }
+
+       public void resumeConsumption() throws IOException {
                for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : 
blockedChannels.entrySet()) {
                        if (blockedChannel.getValue()) {
                                resumeConsumption(blockedChannel.getKey());
                        }
                        blockedChannel.setValue(false);
                }
+               sequenceNumberInAnnouncedChannels.clear();
        }
 
-       private void resumeConsumption(InputChannelInfo channelInfo) throws 
IOException {
+       void resumeConsumption(InputChannelInfo channelInfo) throws IOException 
{
                CheckpointableInput input = inputs[channelInfo.getGateIdx()];
                input.resumeConsumption(channelInfo);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index d040c66..f943638 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -20,10 +20,14 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 public class AlternatingController implements 
CheckpointBarrierBehaviourController {
        private final AlignedController alignedController;
        private final UnalignedController unalignedController;
-       private  CheckpointBarrierBehaviourController activeController;
+
+       private CheckpointBarrierBehaviourController activeController;
+       private long timeOutedBarrierId = -1; // used to shortcut timeout check
 
        public AlternatingController(
                        AlignedController alignedController,
@@ -49,23 +55,107 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        }
 
        @Override
-       public void barrierReceived(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) {
-               checkActiveController(barrier);
-               activeController.barrierReceived(channelInfo, barrier);
+       public void barrierAnnouncement(
+                       InputChannelInfo channelInfo,
+                       CheckpointBarrier announcedBarrier,
+                       int sequenceNumber) throws IOException {
+
+               Optional<CheckpointBarrier> maybeTimedOut = 
maybeTimeOut(announcedBarrier);
+               announcedBarrier = maybeTimedOut.orElse(announcedBarrier);
+
+               if (maybeTimedOut.isPresent() && activeController != 
unalignedController) {
+                       // Let's timeout this barrier
+                       unalignedController.barrierAnnouncement(channelInfo, 
announcedBarrier, sequenceNumber);
+               }
+               else {
+                       // Either we have already timed out before, or we are 
still going with aligned checkpoints
+                       activeController.barrierAnnouncement(channelInfo, 
announcedBarrier, sequenceNumber);
+               }
+       }
+
+       @Override
+       public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo 
channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException 
{
+               if (barrier.getCheckpointOptions().isUnalignedCheckpoint() && 
activeController == alignedController) {
+                       switchToUnaligned(channelInfo, barrier);
+                       activeController.barrierReceived(channelInfo, barrier);
+                       return Optional.of(barrier);
+               }
+
+               Optional<CheckpointBarrier> maybeTimedOut = 
maybeTimeOut(barrier);
+               barrier = maybeTimedOut.orElse(barrier);
+
+               checkState(!activeController.barrierReceived(channelInfo, 
barrier).isPresent());
+
+               if (maybeTimedOut.isPresent()) {
+                       if (activeController == alignedController) {
+                               switchToUnaligned(channelInfo, barrier);
+                               return maybeTimedOut;
+                       }
+                       else {
+                               // TODO: add unit test for this
+                               
alignedController.resumeConsumption(channelInfo);
+                       }
+               }
+               return Optional.empty();
        }
 
        @Override
-       public boolean preProcessFirstBarrier(
+       public Optional<CheckpointBarrier> preProcessFirstBarrier(
                        InputChannelInfo channelInfo,
                        CheckpointBarrier barrier) throws IOException, 
CheckpointException {
-               checkActiveController(barrier);
                return activeController.preProcessFirstBarrier(channelInfo, 
barrier);
        }
 
+       private void switchToUnaligned(
+                       InputChannelInfo channelInfo,
+                       CheckpointBarrier barrier) throws IOException, 
CheckpointException {
+               checkState(alignedController == activeController);
+
+               // timeout all not yet processed barriers for which 
alignedController has processed an announcement
+               for (Map.Entry<InputChannelInfo, Integer> entry : 
alignedController.getSequenceNumberInAnnouncedChannels().entrySet()) {
+                       InputChannelInfo unProcessedChannelInfo = 
entry.getKey();
+                       int announcedBarrierSequenceNumber = entry.getValue();
+                       
unalignedController.barrierAnnouncement(unProcessedChannelInfo, barrier, 
announcedBarrierSequenceNumber);
+               }
+
+               // get blocked channels before resuming consumption
+               Collection<InputChannelInfo> blockedChannels = 
alignedController.getBlockedChannels();
+
+               activeController = unalignedController;
+
+               // alignedController might has already processed some barriers, 
so "migrate"/forward those calls to unalignedController.
+               unalignedController.preProcessFirstBarrier(channelInfo, 
barrier);
+               for (InputChannelInfo blockedChannel : blockedChannels) {
+                       unalignedController.barrierReceived(blockedChannel, 
barrier);
+               }
+
+               alignedController.resumeConsumption();
+       }
+
        @Override
-       public boolean postProcessLastBarrier(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException {
-               checkActiveController(barrier);
-               return activeController.postProcessLastBarrier(channelInfo, 
barrier);
+       public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
+               Optional<CheckpointBarrier> maybeTimeOut = 
maybeTimeOut(barrier);
+               if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
+                       switchToUnaligned(channelInfo, maybeTimeOut.get());
+                       checkState(activeController == unalignedController);
+                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
+                       return maybeTimeOut;
+               }
+
+               barrier = maybeTimeOut.orElse(barrier);
+               if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
+                       checkState(activeController == unalignedController);
+                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
+                       return Optional.empty();
+               }
+               else {
+                       checkState(activeController == alignedController);
+                       Optional<CheckpointBarrier> triggerResult = 
activeController.postProcessLastBarrier(
+                               channelInfo,
+                               barrier);
+                       checkState(triggerResult.isPresent());
+                       return triggerResult;
+               }
        }
 
        @Override
@@ -94,4 +184,21 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
        private CheckpointBarrierBehaviourController 
chooseController(CheckpointBarrier barrier) {
                return isAligned(barrier) ? alignedController : 
unalignedController;
        }
+
+       private Optional<CheckpointBarrier> maybeTimeOut(CheckpointBarrier 
barrier) {
+               CheckpointOptions options = barrier.getCheckpointOptions();
+               boolean shouldTimeout = (options.isTimeoutable()) && (
+                       barrier.getId() == timeOutedBarrierId ||
+                               (System.currentTimeMillis() - 
barrier.getTimestamp()) > options.getAlignmentTimeout());
+               if (options.isUnalignedCheckpoint() || !shouldTimeout) {
+                       return Optional.empty();
+               }
+               else {
+                       timeOutedBarrierId = Math.max(timeOutedBarrierId, 
barrier.getId());
+                       return Optional.of(new CheckpointBarrier(
+                               barrier.getId(),
+                               barrier.getTimestamp(),
+                               options.toTimeouted()));
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
index 5939451..2a3908a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Controls when the checkpoint should be actually triggered.
@@ -37,23 +38,28 @@ public interface CheckpointBarrierBehaviourController {
        void preProcessFirstBarrierOrAnnouncement(CheckpointBarrier barrier);
 
        /**
+        * Invoked per every {@link CheckpointBarrier} announcement.
+        */
+       void barrierAnnouncement(InputChannelInfo channelInfo, 
CheckpointBarrier announcedBarrier, int sequenceNumber) throws IOException;
+
+       /**
         * Invoked per every received {@link CheckpointBarrier}.
         */
-       void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier 
barrier);
+       Optional<CheckpointBarrier> barrierReceived(InputChannelInfo 
channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
        /**
         * Invoked once per checkpoint, before the first invocation of
         * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for 
that given checkpoint.
         * @return {@code true} if checkpoint should be triggered.
         */
-       boolean preProcessFirstBarrier(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException, CheckpointException;
+       Optional<CheckpointBarrier> preProcessFirstBarrier(InputChannelInfo 
channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
        /**
         * Invoked once per checkpoint, after the last invocation of
         * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for 
that given checkpoint.
         * @return {@code true} if checkpoint should be triggered.
         */
-       boolean postProcessLastBarrier(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException;
+       Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo 
channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;
 
        void abortPendingCheckpoint(long cancelledId, CheckpointException 
exception) throws IOException;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
index fa1ee00..b00b1fb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
+import org.apache.flink.util.function.TriFunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
@@ -116,21 +118,15 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                                markAlignmentStart(barrier.getTimestamp());
                        }
                        allBarriersReceivedFuture = new CompletableFuture<>();
-                       try {
-                               if 
(controller.preProcessFirstBarrier(channelInfo, barrier)) {
-                                       LOG.debug("{}: Triggering checkpoint {} 
on the first barrier at {}.",
-                                               taskName,
-                                               barrier.getId(),
-                                               barrier.getTimestamp());
-                                       notifyCheckpoint(barrier);
-                               }
-                       } catch (CheckpointException e) {
-                               abortInternal(barrier.getId(), e);
+
+                       if (!handleBarrier(barrier, channelInfo, 
CheckpointBarrierBehaviourController::preProcessFirstBarrier)) {
                                return;
                        }
                }
 
-               controller.barrierReceived(channelInfo, barrier);
+               if (!handleBarrier(barrier, channelInfo, 
CheckpointBarrierBehaviourController::barrierReceived)) {
+                       return;
+               }
 
                if (currentCheckpointId == barrierId) {
                        if (++numBarriersReceived == numOpenChannels) {
@@ -139,25 +135,61 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
                                }
                                numBarriersReceived = 0;
                                lastCancelledOrCompletedCheckpointId = 
currentCheckpointId;
-                               if 
(controller.postProcessLastBarrier(channelInfo, barrier)) {
-                                       LOG.debug("{}: Triggering checkpoint {} 
on the last barrier at {}.",
-                                               taskName,
-                                               barrier.getId(),
-                                               barrier.getTimestamp());
-                                       notifyCheckpoint(barrier);
-                               }
+                               handleBarrier(barrier, channelInfo, 
CheckpointBarrierBehaviourController::postProcessLastBarrier);
                                allBarriersReceivedFuture.complete(null);
                        }
                }
        }
 
+       private boolean handleBarrier(
+                       CheckpointBarrier barrier,
+                       InputChannelInfo channelInfo,
+                       TriFunctionWithException<
+                               CheckpointBarrierBehaviourController,
+                               InputChannelInfo,
+                               CheckpointBarrier,
+                               Optional<CheckpointBarrier>,
+                               Exception
+                       > controllerAction) throws IOException {
+               try {
+                       Optional<CheckpointBarrier> triggerMaybe = 
controllerAction.apply(controller, channelInfo, barrier);
+                       if (triggerMaybe.isPresent()) {
+                               CheckpointBarrier trigger = triggerMaybe.get();
+                               LOG.debug(
+                                       "{}: Triggering checkpoint {} on the 
barrier announcement at {}.",
+                                       taskName,
+                                       trigger.getId(),
+                                       trigger.getTimestamp());
+                               notifyCheckpoint(trigger);
+                       }
+                       return true;
+               } catch (CheckpointException e) {
+                       abortInternal(barrier.getId(), e);
+                       return false;
+               } catch (RuntimeException | IOException e) {
+                       throw e;
+               } catch (Exception e) {
+                       throw new IOException(e);
+               }
+       }
+
        @Override
        public void processBarrierAnnouncement(
                        CheckpointBarrier announcedBarrier,
                        int sequenceNumber,
                        InputChannelInfo channelInfo) throws IOException {
                checkSubsumedCheckpoint(channelInfo, announcedBarrier);
-               // TODO: FLINK-19681
+
+               long barrierId = announcedBarrier.getId();
+               if (currentCheckpointId > barrierId || (currentCheckpointId == 
barrierId && !isCheckpointPending())) {
+                       LOG.debug("{}: Obsolete announcement of checkpoint {} 
for channel {}.",
+                                       taskName,
+                                       barrierId,
+                                       channelInfo);
+                       return;
+               }
+
+               controller.barrierAnnouncement(channelInfo, announcedBarrier, 
sequenceNumber);
        }
 
        private void checkSubsumedCheckpoint(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
index 9b7c80f..01bd056 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -128,6 +129,10 @@ public class StreamTaskSourceInput<T> implements 
StreamTaskInput<T>, Checkpointa
        }
 
        @Override
+       public void convertToPriorityEvent(int channelIndex, int 
sequenceNumber) throws IOException {
+       }
+
+       @Override
        public int getInputIndex() {
                return inputIndex;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
index 3d5f575..d19533e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInpu
 import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * Controller for unaligned checkpoints.
@@ -48,22 +49,33 @@ public class UnalignedController implements 
CheckpointBarrierBehaviourController
        }
 
        @Override
-       public void barrierReceived(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) {
+       public void barrierAnnouncement(
+                       InputChannelInfo channelInfo,
+                       CheckpointBarrier announcedBarrier,
+                       int sequenceNumber) throws IOException {
+               inputs[channelInfo.getGateIdx()].convertToPriorityEvent(
+                       channelInfo.getInputChannelIdx(),
+                       sequenceNumber);
+       }
+
+       @Override
+       public Optional<CheckpointBarrier> barrierReceived(InputChannelInfo 
channelInfo, CheckpointBarrier barrier) {
+               return Optional.empty();
        }
 
        @Override
-       public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) throws IOException, CheckpointException {
+       public Optional<CheckpointBarrier> 
preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
                checkpointCoordinator.initCheckpoint(barrier.getId(), 
barrier.getCheckpointOptions());
                for (final CheckpointableInput input : inputs) {
                        input.checkpointStarted(barrier);
                }
-               return true;
+               return Optional.of(barrier);
        }
 
        @Override
-       public boolean postProcessLastBarrier(InputChannelInfo channelInfo, 
CheckpointBarrier barrier) {
+       public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
{
                resetPendingCheckpoint(barrier.getId());
-               return false;
+               return Optional.empty();
        }
 
        private void resetPendingCheckpoint(long cancelledId) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index 008ea9f..41b297d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -86,6 +86,11 @@ public class MailboxProcessor implements Closeable {
 
        private Meter idleTime = new MeterView(new SimpleCounter());
 
+       @VisibleForTesting
+       public MailboxProcessor() {
+               this(MailboxDefaultAction.Controller::suspendDefaultAction);
+       }
+
        public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
                this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
index f2bd319..b9eac08 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlignedControllerTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
+import org.hamcrest.collection.IsMapContaining;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,11 +51,16 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.stream.IntStream;
 
 import static 
org.apache.flink.streaming.runtime.io.UnalignedControllerTest.addSequence;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -143,6 +149,29 @@ public class AlignedControllerTest {
        //  Tests
        // 
------------------------------------------------------------------------
 
+       public void testGetChannelsWithUnprocessedBarriers() throws IOException 
{
+               mockInputGate = new MockInputGate(4, Collections.emptyList());
+               AlignedController alignedController = new 
AlignedController(mockInputGate);
+               BufferOrEvent barrier0 = createBarrier(1, 0);
+               BufferOrEvent barrier1 = createBarrier(1, 1);
+               BufferOrEvent barrier3 = createBarrier(1, 3);
+               
alignedController.barrierAnnouncement(barrier0.getChannelInfo(), 
(CheckpointBarrier) barrier0.getEvent(), 0);
+               alignedController.barrierReceived(barrier0.getChannelInfo(), 
(CheckpointBarrier) barrier0.getEvent());
+               
alignedController.barrierAnnouncement(barrier1.getChannelInfo(), 
(CheckpointBarrier) barrier1.getEvent(), 1);
+               
alignedController.barrierAnnouncement(barrier3.getChannelInfo(), 
(CheckpointBarrier) barrier3.getEvent(), 42);
+
+               Collection<InputChannelInfo> blockedChannels = 
alignedController.getBlockedChannels();
+               Map<InputChannelInfo, Integer> announcedChannels = 
alignedController.getSequenceNumberInAnnouncedChannels();
+
+               // blockedChannels and announcedChannels should be copies and 
shouldn't be cleared by the resumeConsumption
+               alignedController.resumeConsumption();
+
+               assertThat(blockedChannels, 
contains(barrier0.getChannelInfo()));
+               assertThat(announcedChannels, 
IsMapContaining.hasEntry(barrier1.getChannelInfo(), 1));
+               assertThat(announcedChannels, 
IsMapContaining.hasEntry(barrier3.getChannelInfo(), 42));
+               assertThat(announcedChannels.size(), equalTo(2));
+       }
+
        /**
         * Validates that the buffer behaves correctly if no checkpoint 
barriers come,
         * for a single input channel.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
index db958f3..8d81df2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
@@ -19,11 +19,18 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EventAnnouncement;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
@@ -32,12 +39,14 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Collections.singletonList;
 import static junit.framework.TestCase.assertTrue;
@@ -45,8 +54,11 @@ import static 
org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer.toBuffer;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -84,7 +96,7 @@ public class AlternatingControllerTest {
        }
 
        @Test
-       public void testAlignedTimeoutableCheckpoint() throws Exception {
+       public void testAlignedNeverTimeoutableCheckpoint() throws Exception {
                int numChannels = 2;
                int bufferSize = 1000;
                ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
@@ -95,14 +107,248 @@ public class AlternatingControllerTest {
                Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, 
checkpointCreationTime, Long.MAX_VALUE);
                send(neverTimeoutableCheckpoint, gate, 0);
                sendBuffer(bufferSize, gate, 1);
-
                assertEquals(0, target.getTriggeredCheckpointCounter());
 
                send(neverTimeoutableCheckpoint, gate, 1);
+               assertEquals(1, target.getTriggeredCheckpointCounter());
+       }
+
+       @Test
+       public void testTimeoutAlignment() throws Exception {
+               int numChannels = 2;
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
+
+               testTimeoutBarrierOnTwoChannels(target, gate);
+       }
+
+       @Test
+       public void testTimeoutAlignmentAfterProcessingBarrier() throws 
Exception {
+               int numChannels = 3;
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
 
+               long alignmentTimeout = 10;
+               long checkpointCreationTime = System.currentTimeMillis() - 2 * 
alignmentTimeout;
+               Buffer neverTimeoutableCheckpoint = barrier(1, CHECKPOINT, 
checkpointCreationTime, Long.MAX_VALUE);
+
+               RemoteInputChannel channel2 = (RemoteInputChannel) 
gate.getChannel(2);
+
+               channel2.onBuffer(neverTimeoutableCheckpoint.retainBuffer(), 0, 
0);
+               while (gate.pollNext().isPresent()) {
+               }
+
+               assertEquals(0, target.getTriggeredCheckpointCounter());
+
+               testTimeoutBarrierOnTwoChannels(target, gate);
+       }
+
+       private void 
testTimeoutBarrierOnTwoChannels(ValidatingCheckpointHandler target, 
CheckpointedInputGate gate) throws Exception {
+               int bufferSize = 1000;
+               long alignmentTimeout = 10;
+               long checkpointCreationTime = System.currentTimeMillis() - 2 * 
alignmentTimeout;
+               Buffer checkpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
+               Buffer buffer = TestBufferFactory.createBuffer(bufferSize);
+
+               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
+               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
+               channel0.onBuffer(buffer.retainBuffer(), 0, 0);
+               channel0.onBuffer(buffer.retainBuffer(), 1, 0);
+               channel0.onBuffer(checkpointBarrier.retainBuffer(), 2, 0);
+               channel1.onBuffer(buffer.retainBuffer(), 0, 0);
+               channel1.onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
+
+               assertEquals(0, target.getTriggeredCheckpointCounter());
+               // First announcements and prioritsed barriers
+               List<AbstractEvent> events = new ArrayList<>();
+               events.add(gate.pollNext().get().getEvent());
+               events.add(gate.pollNext().get().getEvent());
+               events.add(gate.pollNext().get().getEvent());
+               events.add(gate.pollNext().get().getEvent());
+               assertThat(events, containsInAnyOrder(
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(CheckpointBarrier.class),
+                       instanceOf(CheckpointBarrier.class)));
+               assertEquals(1, target.getTriggeredCheckpointCounter());
+               assertThat(
+                       target.getTriggeredCheckpointOptions(),
+                       contains(CheckpointOptions.create(
+                               CHECKPOINT,
+                               CheckpointStorageLocationReference.getDefault(),
+                               true,
+                               true,
+                               0)));
+               // Followed by overtaken buffers
+               assertFalse(gate.pollNext().get().isEvent());
+               assertFalse(gate.pollNext().get().isEvent());
+               assertFalse(gate.pollNext().get().isEvent());
+       }
+
+       /**
+        * This test tries to make sure that the first time out happens after 
processing
+        * {@link EventAnnouncement} but before/during processing the first 
{@link CheckpointBarrier}.
+        */
+       @Test
+       public void testTimeoutAlignmentOnFirstBarrier() throws Exception {
+               int numChannels = 2;
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels);
+
+               long alignmentTimeout = 100;
+               long checkpointCreationTime = System.currentTimeMillis();
+               Buffer checkpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
+
+               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
+               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
+               channel0.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+               channel1.onBuffer(checkpointBarrier.retainBuffer(), 0, 0);
+
+               assertEquals(0, target.getTriggeredCheckpointCounter());
+               // First announcements and prioritsed barriers
+               List<AbstractEvent> events = new ArrayList<>();
+               events.add(gate.pollNext().get().getEvent());
+               events.add(gate.pollNext().get().getEvent());
+
+               Thread.sleep(alignmentTimeout * 2);
+
+               events.add(gate.pollNext().get().getEvent());
+               assertThat(events, contains(
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(CheckpointBarrier.class)));
                assertEquals(1, target.getTriggeredCheckpointCounter());
        }
 
+       /**
+        * First we process aligned {@link CheckpointBarrier} and after that we 
receive an already unaligned
+        * {@link CheckpointBarrier}, that has timed out on an upstream task.
+        */
+       @Test
+       public void testTimeoutAlignmentOnUnalignedCheckpoint() throws 
Exception {
+               int numChannels = 2;
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
+               CheckpointedInputGate gate = buildRemoteInputGate(target, 
numChannels, channelStateWriter);
+
+               long checkpointCreationTime = System.currentTimeMillis();
+               Buffer alignedCheckpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, Integer.MAX_VALUE);
+               Buffer unalignedCheckpointBarrier = barrier(1, CHECKPOINT, 
checkpointCreationTime, 0);
+               Buffer buffer = TestBufferFactory.createBuffer(1000);
+
+               RemoteInputChannel channel0 = (RemoteInputChannel) 
gate.getChannel(0);
+               RemoteInputChannel channel1 = (RemoteInputChannel) 
gate.getChannel(1);
+               channel0.onBuffer(alignedCheckpointBarrier.retainBuffer(), 0, 
0);
+
+               List<AbstractEvent> events = new ArrayList<>();
+               events.add(gate.pollNext().get().getEvent());
+               events.add(gate.pollNext().get().getEvent());
+
+               assertThat(events, contains(
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(CheckpointBarrier.class)));
+
+               channel1.onBuffer(buffer.retainBuffer(), 0, 0);
+               channel1.onBuffer(buffer.retainBuffer(), 1, 0);
+               channel1.onBuffer(unalignedCheckpointBarrier.retainBuffer(), 2, 
0);
+
+               events.add(gate.pollNext().get().getEvent());
+
+               assertThat(events, contains(
+                       instanceOf(EventAnnouncement.class),
+                       instanceOf(CheckpointBarrier.class),
+                       instanceOf(CheckpointBarrier.class)));
+
+               
assertEquals(channelStateWriter.getAddedInput().get(channel1.getChannelInfo()).size(),
 2);
+               assertEquals(1, target.getTriggeredCheckpointCounter());
+       }
+
+       @Test
+       public void testTimeoutAlignmentConsistencyOnPreProcessBarrier() throws 
Exception {
+               testTimeoutAlignmentConsistency(true, false, false);
+       }
+
+       @Test
+       public void testTimeoutAlignmentConsistencyOnProcessBarrier() throws 
Exception {
+               testTimeoutAlignmentConsistency(false, true, false);
+       }
+
+       @Test
+       public void testTimeoutAlignmentConsistencyOnPostProcessBarrier() 
throws Exception {
+               testTimeoutAlignmentConsistency(false, false, true);
+       }
+
+       public void testTimeoutAlignmentConsistency(
+                       boolean sleepBeforePreProcess,
+                       boolean sleepBeforeProcess,
+                       boolean sleepBeforePostProcess) throws Exception {
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               SingleInputGate gate = new 
SingleInputGateBuilder().setNumberOfChannels(1).build();
+               TestInputChannel channel0 = new TestInputChannel(gate, 0, 
false, true);
+               gate.setInputChannels(channel0);
+
+               RecordingChannelStateWriter channelStateWriter = new 
RecordingChannelStateWriter();
+               AlternatingController controller = new AlternatingController(
+                       new AlignedController(gate),
+                       new UnalignedController(
+                               new 
TestSubtaskCheckpointCoordinator(channelStateWriter),
+                               gate));
+
+               long checkpointCreationTime = System.currentTimeMillis();
+               long alignmentTimeout = 10;
+               CheckpointBarrier barrier = checkpointBarrier(1, CHECKPOINT, 
checkpointCreationTime, alignmentTimeout);
+
+               InputChannelInfo channelInfo = channel0.getChannelInfo();
+
+               controller.preProcessFirstBarrierOrAnnouncement(barrier);
+               controller.barrierAnnouncement(channelInfo, barrier, 1);
+
+               if (sleepBeforePreProcess) {
+                       Thread.sleep(alignmentTimeout * 2);
+               }
+               Optional<CheckpointBarrier> preProcessTrigger = 
controller.preProcessFirstBarrier(channelInfo, barrier);
+               if (sleepBeforeProcess) {
+                       Thread.sleep(alignmentTimeout * 2);
+               }
+               Optional<CheckpointBarrier> processTrigger = 
controller.barrierReceived(channelInfo, barrier);
+               if (sleepBeforePostProcess) {
+                       Thread.sleep(alignmentTimeout * 2);
+               }
+               Optional<CheckpointBarrier> postProcessTrigger = 
controller.postProcessLastBarrier(channelInfo, barrier);
+
+               int triggeredCount = 0;
+               boolean unalignedCheckpoint = false;
+               if (preProcessTrigger.isPresent()) {
+                       triggeredCount++;
+                       unalignedCheckpoint = 
preProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+                       assertTrue(unalignedCheckpoint);
+               }
+               if (processTrigger.isPresent()) {
+                       triggeredCount++;
+                       unalignedCheckpoint = 
processTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+                       assertTrue(unalignedCheckpoint);
+               }
+               if (postProcessTrigger.isPresent()) {
+                       triggeredCount++;
+                       unalignedCheckpoint = 
postProcessTrigger.get().getCheckpointOptions().isUnalignedCheckpoint();
+               }
+
+               assertEquals(
+                       String.format(
+                               "Checkpoint should be triggered exactly once, 
but [%s, %s, %s] was found instead",
+                               preProcessTrigger.isPresent(),
+                               processTrigger.isPresent(),
+                               postProcessTrigger.isPresent()),
+                       1,
+                       triggeredCount);
+
+               if (unalignedCheckpoint) {
+                       // check that we can add output data if we are in 
unaligned checkpoint mode. In other words
+                       // if the state writer has been initialised correctly.
+                       assertEquals(barrier.getId(), 
channelStateWriter.getLastStartedCheckpointId());
+               }
+       }
+
        @Test
        public void testMetricsAlternation() throws Exception {
                int numChannels = 2;
@@ -362,6 +608,13 @@ public class AlternatingControllerTest {
        }
 
        private static SingleCheckpointBarrierHandler 
barrierHandler(SingleInputGate inputGate, AbstractInvokable target) {
+               return barrierHandler(inputGate, target, new 
RecordingChannelStateWriter());
+       }
+
+       private static SingleCheckpointBarrierHandler barrierHandler(
+                       SingleInputGate inputGate,
+                       AbstractInvokable target,
+                       ChannelStateWriter stateWriter) {
                String taskName = "test";
                return new SingleCheckpointBarrierHandler(
                        taskName,
@@ -369,7 +622,7 @@ public class AlternatingControllerTest {
                        inputGate.getNumberOfInputChannels(),
                        new AlternatingController(
                                new AlignedController(inputGate),
-                               new 
UnalignedController(TestSubtaskCheckpointCoordinator.INSTANCE, inputGate)));
+                               new UnalignedController(new 
TestSubtaskCheckpointCoordinator(stateWriter), inputGate)));
        }
 
        private Buffer barrier(long barrierId, CheckpointType checkpointType) 
throws IOException {
@@ -381,18 +634,27 @@ public class AlternatingControllerTest {
        }
 
        private Buffer barrier(long barrierId, CheckpointType checkpointType, 
long barrierTimestamp, long alignmentTimeout) throws IOException {
+               CheckpointBarrier checkpointBarrier = checkpointBarrier(
+                       barrierId,
+                       checkpointType,
+                       barrierTimestamp,
+                       alignmentTimeout);
+               return toBuffer(
+                       checkpointBarrier,
+                       
checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint());
+       }
+
+       private CheckpointBarrier checkpointBarrier(long barrierId, 
CheckpointType checkpointType, long barrierTimestamp, long alignmentTimeout) {
                CheckpointOptions options = CheckpointOptions.create(
                        checkpointType,
                        CheckpointStorageLocationReference.getDefault(),
                        true,
                        true,
                        alignmentTimeout);
-               return toBuffer(
-                       new CheckpointBarrier(
-                               barrierId,
-                               barrierTimestamp,
-                               options),
-                       options.isUnalignedCheckpoint());
+               return new CheckpointBarrier(
+                       barrierId,
+                       barrierTimestamp,
+                       options);
        }
 
        private static CheckpointedInputGate buildGate(AbstractInvokable 
target, int numChannels) {
@@ -405,4 +667,30 @@ public class AlternatingControllerTest {
                return new CheckpointedInputGate(gate, barrierHandler(gate, 
target), new SyncMailboxExecutor());
        }
 
+       private static CheckpointedInputGate buildRemoteInputGate(
+                       AbstractInvokable target,
+                       int numChannels) throws IOException {
+               return buildRemoteInputGate(target, numChannels, new 
RecordingChannelStateWriter());
+       }
+
+       private static CheckpointedInputGate buildRemoteInputGate(
+                       AbstractInvokable target,
+                       int numChannels,
+                       ChannelStateWriter channelStateWriter) throws 
IOException {
+               int maxUsedBuffers = 10;
+               NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numChannels * maxUsedBuffers, 4096);
+               SingleInputGate gate = new SingleInputGateBuilder()
+                       
.setChannelFactory(InputChannelBuilder::buildRemoteChannel)
+                       .setNumberOfChannels(numChannels)
+                       .setSegmentProvider(networkBufferPool)
+                       
.setBufferPoolFactory(networkBufferPool.createBufferPool(numChannels, 
maxUsedBuffers))
+                       .setChannelStateWriter(channelStateWriter)
+                       .build();
+               gate.setup();
+               gate.requestPartitions();
+               // do not fire events automatically. If you need events, you 
should expose mailboxProcessor and
+               // execute it step by step
+               MailboxProcessor mailboxProcessor = new MailboxProcessor();
+               return new CheckpointedInputGate(gate, barrierHandler(gate, 
target, channelStateWriter), mailboxProcessor.getMainMailboxExecutor());
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
index f426289..ce36b39 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/ValidatingCheckpointHandler.java
@@ -46,7 +46,8 @@ public class ValidatingCheckpointHandler extends 
AbstractInvokable {
        protected long abortedCheckpointCounter = 0;
        protected CompletableFuture<Long> lastAlignmentDurationNanos;
        protected CompletableFuture<Long> lastBytesProcessedDuringAlignment;
-       protected List<Long> triggeredCheckpoints = new ArrayList<>();
+       protected final List<Long> triggeredCheckpoints = new ArrayList<>();
+       protected final List<CheckpointOptions> triggeredCheckpointOptions = 
new ArrayList<>();
 
        public ValidatingCheckpointHandler() {
                this(-1);
@@ -89,6 +90,10 @@ public class ValidatingCheckpointHandler extends 
AbstractInvokable {
                return lastBytesProcessedDuringAlignment;
        }
 
+       public List<CheckpointOptions> getTriggeredCheckpointOptions() {
+               return triggeredCheckpointOptions;
+       }
+
        @Override
        public void invoke() {
                throw new UnsupportedOperationException();
@@ -119,6 +124,7 @@ public class ValidatingCheckpointHandler extends 
AbstractInvokable {
                lastBytesProcessedDuringAlignment = 
checkpointMetrics.getBytesProcessedDuringAlignment();
 
                triggeredCheckpoints.add(checkpointMetaData.getCheckpointId());
+               triggeredCheckpointOptions.add(checkpointOptions);
        }
 
        @Override

Reply via email to