This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dd8f4e2603309493300099396568ffc681e76e80
Author: fanrui <[email protected]>
AuthorDate: Sat May 14 22:50:39 2022 +0800

    [FLINK-27251][checkpoint] Timeout aligned to unaligned checkpoint barrier 
in the output buffers
---
 .../runtime/checkpoint/CheckpointOptions.java      |   6 +-
 .../channel/ChannelStateWriteRequest.java          |  61 ++++++-
 .../checkpoint/channel/ChannelStateWriter.java     |  24 +++
 .../checkpoint/channel/ChannelStateWriterImpl.java |  18 ++
 .../io/network/api/writer/RecordWriter.java        |   9 +
 .../network/api/writer/ResultPartitionWriter.java  |   7 +
 .../partition/BoundedBlockingSubpartition.java     |  11 ++
 .../partition/BufferWritingResultPartition.java    |  15 ++
 .../network/partition/PipelinedSubpartition.java   | 196 ++++++++++++++++++++-
 .../io/network/partition/ResultSubpartition.java   |   5 +
 .../partition/SortMergeResultPartition.java        |  11 ++
 .../ChannelStateWriteRequestDispatcherTest.java    |  19 ++
 .../checkpoint/channel/MockChannelStateWriter.java |  19 ++
 .../network/partition/InputChannelTestUtils.java   |   2 +
 .../partition/MockResultPartitionWriter.java       |   7 +
 .../partition/PipelinedSubpartitionTest.java       | 143 +++++++++++++++
 .../runtime/state/ChannelPersistenceITCase.java    |  34 +++-
 .../streaming/runtime/io/RecordWriterOutput.java   |   9 +
 ...tractAlternatingAlignedBarrierHandlerState.java |   1 +
 .../AlternatingCollectingBarriers.java             |   1 +
 .../streaming/runtime/tasks/OperatorChain.java     |  13 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  22 ++-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 111 ++++++++++--
 .../MockSubtaskCheckpointCoordinatorBuilder.java   |   3 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |  61 ++++++-
 25 files changed, 766 insertions(+), 42 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index bc5b1dd9006..eaa30a3f81c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -186,6 +186,10 @@ public class CheckpointOptions implements Serializable {
         return alignmentType == AlignmentType.UNALIGNED;
     }
 
+    public boolean needsChannelState() {
+        return isUnalignedCheckpoint() || isTimeoutable();
+    }
+
     public CheckpointOptions withUnalignedSupported() {
         if (alignmentType == AlignmentType.FORCED_ALIGNED) {
             return alignedCheckpointTimeout != NO_ALIGNED_CHECKPOINT_TIME_OUT
@@ -196,7 +200,7 @@ public class CheckpointOptions implements Serializable {
     }
 
     public CheckpointOptions withUnalignedUnsupported() {
-        if (isUnalignedCheckpoint() || isTimeoutable()) {
+        if (needsChannelState()) {
             return forceAligned(checkpointType, targetLocation, 
alignedCheckpointTimeout);
         }
         return this;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
index d8d19232df8..91a96a1d886 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java
@@ -24,6 +24,9 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingConsumer;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 
@@ -72,6 +75,48 @@ interface ChannelStateWriteRequest {
                 (writer, buffer) -> writer.writeOutput(info, buffer));
     }
 
+    static ChannelStateWriteRequest write(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            CompletableFuture<List<Buffer>> dataFuture) {
+        return buildFutureWriteRequest(
+                checkpointId,
+                "writeOutputFuture",
+                dataFuture,
+                (writer, buffer) -> writer.writeOutput(info, buffer));
+    }
+
+    static ChannelStateWriteRequest buildFutureWriteRequest(
+            long checkpointId,
+            String name,
+            CompletableFuture<List<Buffer>> dataFuture,
+            BiConsumer<ChannelStateCheckpointWriter, Buffer> bufferConsumer) {
+        return new CheckpointInProgressRequest(
+                name,
+                checkpointId,
+                writer -> {
+                    List<Buffer> buffers;
+                    try {
+                        buffers = dataFuture.get();
+                    } catch (ExecutionException e) {
+                        // If dataFuture fails, fail only the single related 
writer
+                        writer.fail(e);
+                        return;
+                    }
+                    for (Buffer buffer : buffers) {
+                        checkBufferIsBuffer(buffer);
+                        bufferConsumer.accept(writer, buffer);
+                    }
+                },
+                throwable -> {
+                    try {
+                        CloseableIterator.fromList(dataFuture.get(), 
Buffer::recycleBuffer).close();
+                    } catch (ExecutionException ignored) {
+                    }
+                },
+                false);
+    }
+
     static ChannelStateWriteRequest buildWriteRequest(
             long checkpointId,
             String name,
@@ -83,12 +128,7 @@ interface ChannelStateWriteRequest {
                 writer -> {
                     while (iterator.hasNext()) {
                         Buffer buffer = iterator.next();
-                        try {
-                            checkArgument(buffer.isBuffer());
-                        } catch (Exception e) {
-                            buffer.recycleBuffer();
-                            throw e;
-                        }
+                        checkBufferIsBuffer(buffer);
                         bufferConsumer.accept(writer, buffer);
                     }
                 },
@@ -96,6 +136,15 @@ interface ChannelStateWriteRequest {
                 false);
     }
 
+    static void checkBufferIsBuffer(Buffer buffer) {
+        try {
+            checkArgument(buffer.isBuffer());
+        } catch (Exception e) {
+            buffer.recycleBuffer();
+            throw e;
+        }
+    }
+
     static ChannelStateWriteRequest start(
             long checkpointId,
             ChannelStateWriteResult targetResult,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 43a81d114fc..74eef6c9336 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.CloseableIterator;
 import java.io.Closeable;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 /** Writes channel state during checkpoint/savepoint. */
@@ -128,6 +129,22 @@ public interface ChannelStateWriter extends Closeable {
             long checkpointId, ResultSubpartitionInfo info, int startSeqNum, 
Buffer... data)
             throws IllegalArgumentException;
 
+    /**
+     * Add in-flight bufferFuture from the {@link
+     * org.apache.flink.runtime.io.network.partition.ResultSubpartition 
ResultSubpartition}. Must be
+     * called after {@link #start} and before {@link #finishOutput(long)}. 
Buffers are recycled
+     * after they are written or exception occurs.
+     *
+     * <p>The method will be called when the unaligned checkpoint is enabled 
and received an aligned
+     * barrier.
+     */
+    void addOutputDataFuture(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            int startSeqNum,
+            CompletableFuture<List<Buffer>> data)
+            throws IllegalArgumentException;
+
     /**
      * Finalize write of channel state data for the given checkpoint id. Must 
be called after {@link
      * #start(long, CheckpointOptions)} and all of the input data of the given 
checkpoint added.
@@ -178,6 +195,13 @@ public interface ChannelStateWriter extends Closeable {
         public void addOutputData(
                 long checkpointId, ResultSubpartitionInfo info, int 
startSeqNum, Buffer... data) {}
 
+        @Override
+        public void addOutputDataFuture(
+                long checkpointId,
+                ResultSubpartitionInfo info,
+                int startSeqNum,
+                CompletableFuture<List<Buffer>> data) {}
+
         @Override
         public void finishInput(long checkpointId) {}
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index ef8f58b594a..40d09f1a21a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -171,6 +173,22 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
         enqueue(write(checkpointId, info, data), false);
     }
 
+    @Override
+    public void addOutputDataFuture(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            int startSeqNum,
+            CompletableFuture<List<Buffer>> dataFuture)
+            throws IllegalArgumentException {
+        LOG.trace(
+                "{} adding output data future, checkpoint {}, channel: {}, 
startSeqNum: {}",
+                taskName,
+                checkpointId,
+                info,
+                startSeqNum);
+        enqueue(write(checkpointId, info, dataFuture), false);
+    }
+
     @Override
     public void finishInput(long checkpointId) {
         LOG.debug("{} finishing input data, checkpoint {}", taskName, 
checkpointId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4be9941abdd..9d5f351ef89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
@@ -122,6 +123,14 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
         }
     }
 
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        targetPartition.alignedBarrierTimeout(checkpointId);
+    }
+
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        targetPartition.abortCheckpoint(checkpointId, cause);
+    }
+
     @VisibleForTesting
     public static ByteBuffer serializeRecord(
             DataOutputSerializer serializer, IOReadableWritable record) throws 
IOException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 081565fc83e..9a505722f6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.api.StopMode;
@@ -67,6 +68,12 @@ public interface ResultPartitionWriter extends 
AutoCloseable, AvailabilityProvid
     /** Writes the given {@link AbstractEvent} to all channels. */
     void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws 
IOException;
 
+    /** Timeout the aligned barrier to unaligned barrier. */
+    void alignedBarrierTimeout(long checkpointId) throws IOException;
+
+    /** Abort the checkpoint. */
+    void abortCheckpoint(long checkpointId, CheckpointException cause);
+
     /**
      * Notifies the downstream tasks that this {@code ResultPartitionWriter} 
have emitted all the
      * user records.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 0a5b0fcab73..c09975cc606 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -291,6 +292,16 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
         return data.getSize();
     }
 
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) {
+        // Nothing to do.
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        // Nothing to do.
+    }
+
     int getBuffersInBacklogUnsafe() {
         return numDataBuffersWritten;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 4e82d5beda1..2422f3ad808 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -206,6 +207,20 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
         }
     }
 
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        for (ResultSubpartition subpartition : subpartitions) {
+            subpartition.alignedBarrierTimeout(checkpointId);
+        }
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        for (ResultSubpartition subpartition : subpartitions) {
+            subpartition.abortCheckpoint(checkpointId, cause);
+        }
+    }
+
     @Override
     public void setMetricGroup(TaskIOMetricGroup metrics) {
         super.setMetricGroup(metrics);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 125e38fe34d..8be8a287836 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -41,8 +42,10 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -112,6 +115,17 @@ public class PipelinedSubpartition extends 
ResultSubpartition
 
     private int bufferSize = Integer.MAX_VALUE;
 
+    /** The channelState Future of unaligned checkpoint. */
+    @GuardedBy("buffers")
+    private CompletableFuture<List<Buffer>> channelStateFuture;
+
+    /**
+     * It is the checkpointId corresponding to channelStateFuture. And It 
should be always update
+     * with {@link #channelStateFuture}.
+     */
+    @GuardedBy("buffers")
+    private long channelStateCheckpointId;
+
     /**
      * Whether this subpartition is blocked (e.g. by exactly once checkpoint) 
and is waiting for
      * resumption.
@@ -205,6 +219,9 @@ public class PipelinedSubpartition extends 
ResultSubpartition
         assert Thread.holdsLock(buffers);
         if (bufferConsumer.getDataType().hasPriority()) {
             return processPriorityBuffer(bufferConsumer, partialRecordLength);
+        } else if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                == bufferConsumer.getDataType()) {
+            processTimeoutableCheckpointBarrier(bufferConsumer);
         }
         buffers.add(new BufferConsumerWithPartialRecordLength(bufferConsumer, 
partialRecordLength));
         return false;
@@ -240,9 +257,151 @@ public class PipelinedSubpartition extends 
ResultSubpartition
                         inflightBuffers.toArray(new Buffer[0]));
             }
         }
-        return numPriorityElements == 1
-                && !isBlocked; // if subpartition is blocked then downstream 
doesn't expect any
-        // notifications
+        return needNotifyPriorityEvent();
+    }
+
+    // It just be called after add priorityEvent.
+    private boolean needNotifyPriorityEvent() {
+        assert Thread.holdsLock(buffers);
+        // if subpartition is blocked then downstream doesn't expect any 
notifications
+        return buffers.getNumPriorityElements() == 1 && !isBlocked;
+    }
+
+    private void processTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        channelStateWriter.addOutputDataFuture(
+                barrier.getId(),
+                subpartitionInfo,
+                ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
+                createChannelStateFuture(barrier.getId()));
+    }
+
+    private CompletableFuture<List<Buffer>> createChannelStateFuture(long 
checkpointId) {
+        assert Thread.holdsLock(buffers);
+        if (channelStateFuture != null) {
+            completeChannelStateFuture(
+                    null,
+                    new IllegalStateException(
+                            String.format(
+                                    "%s has uncompleted channelStateFuture of 
checkpointId=%s, but it received "
+                                            + "a new timeoutable checkpoint 
barrier of checkpointId=%s, it maybe "
+                                            + "a bug due to currently not 
supported concurrent unaligned checkpoint.",
+                                    this, channelStateCheckpointId, 
checkpointId)));
+        }
+        channelStateFuture = new CompletableFuture<>();
+        channelStateCheckpointId = checkpointId;
+        return channelStateFuture;
+    }
+
+    private void completeChannelStateFuture(List<Buffer> channelResult, 
Throwable e) {
+        assert Thread.holdsLock(buffers);
+        if (e != null) {
+            channelStateFuture.completeExceptionally(e);
+        } else {
+            channelStateFuture.complete(channelResult);
+        }
+        channelStateFuture = null;
+    }
+
+    private boolean isChannelStateFutureAvailable(long checkpointId) {
+        assert Thread.holdsLock(buffers);
+        return channelStateFuture != null && channelStateCheckpointId == 
checkpointId;
+    }
+
+    private CheckpointBarrier parseAndCheckTimeoutableCheckpointBarrier(
+            BufferConsumer bufferConsumer) {
+        CheckpointBarrier barrier = parseCheckpointBarrier(bufferConsumer);
+        checkArgument(barrier != null, "Parse the timeoutable Checkpoint 
Barrier failed.");
+        checkState(
+                barrier.getCheckpointOptions().isTimeoutable()
+                        && 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                                == bufferConsumer.getDataType());
+        return barrier;
+    }
+
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        int prioritySequenceNumber = DEFAULT_PRIORITY_SEQUENCE_NUMBER;
+        synchronized (buffers) {
+            // The checkpoint barrier has sent to downstream, so nothing to do.
+            if (!isChannelStateFutureAvailable(checkpointId)) {
+                return;
+            }
+
+            // 1. find inflightBuffers and timeout the aligned barrier to 
unaligned barrier
+            List<Buffer> inflightBuffers = new ArrayList<>();
+            try {
+                if (findInflightBuffersAndMakeBarrierToPriority(checkpointId, 
inflightBuffers)) {
+                    prioritySequenceNumber = sequenceNumber;
+                }
+            } catch (IOException e) {
+                inflightBuffers.forEach(Buffer::recycleBuffer);
+                completeChannelStateFuture(null, e);
+                throw e;
+            }
+
+            // 2. complete the channelStateFuture
+            completeChannelStateFuture(inflightBuffers, null);
+        }
+
+        // 3. notify downstream read barrier, it must be called outside the 
buffers_lock to avoid
+        // the deadlock.
+        notifyPriorityEvent(prioritySequenceNumber);
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        synchronized (buffers) {
+            if (isChannelStateFutureAvailable(checkpointId)) {
+                completeChannelStateFuture(null, cause);
+            }
+        }
+    }
+
+    private boolean findInflightBuffersAndMakeBarrierToPriority(
+            long checkpointId, List<Buffer> inflightBuffers) throws 
IOException {
+        // 1. record the buffers before barrier as inflightBuffers
+        final int numPriorityElements = buffers.getNumPriorityElements();
+        final Iterator<BufferConsumerWithPartialRecordLength> iterator = 
buffers.iterator();
+        Iterators.advance(iterator, numPriorityElements);
+
+        BufferConsumerWithPartialRecordLength element = null;
+        CheckpointBarrier barrier = null;
+        while (iterator.hasNext()) {
+            BufferConsumerWithPartialRecordLength next = iterator.next();
+            BufferConsumer bufferConsumer = next.getBufferConsumer();
+
+            if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                    == bufferConsumer.getDataType()) {
+                barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+                // It may be a aborted barrier
+                if (barrier.getId() != checkpointId) {
+                    continue;
+                }
+                element = next;
+                break;
+            } else if (bufferConsumer.isBuffer()) {
+                try (BufferConsumer bc = bufferConsumer.copy()) {
+                    inflightBuffers.add(bc.build());
+                }
+            }
+        }
+
+        // 2. Make the barrier to be priority
+        checkNotNull(
+                element, "The checkpoint barrier=%d don't find in %s.", 
checkpointId, toString());
+        makeBarrierToPriority(element, barrier);
+
+        return needNotifyPriorityEvent();
+    }
+
+    private void makeBarrierToPriority(
+            BufferConsumerWithPartialRecordLength oldElement, 
CheckpointBarrier barrier)
+            throws IOException {
+        buffers.getAndRemove(oldElement::equals);
+        buffers.addPriorityElement(
+                new BufferConsumerWithPartialRecordLength(
+                        
EventSerializer.toBufferConsumer(barrier.asUnaligned(), true), 0));
     }
 
     @Nullable
@@ -280,6 +439,12 @@ public class PipelinedSubpartition extends 
ResultSubpartition
             }
             buffers.clear();
 
+            if (channelStateFuture != null) {
+                IllegalStateException exception =
+                        new IllegalStateException("The PipelinedSubpartition 
is released");
+                completeChannelStateFuture(null, exception);
+            }
+
             view = readView;
             readView = null;
 
@@ -312,7 +477,10 @@ public class PipelinedSubpartition extends 
ResultSubpartition
                         buffers.peek();
                 BufferConsumer bufferConsumer =
                         
bufferConsumerWithPartialRecordLength.getBufferConsumer();
-
+                if (Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER
+                        == bufferConsumer.getDataType()) {
+                    completeTimeoutableCheckpointBarrier(bufferConsumer);
+                }
                 buffer = 
buildSliceBuffer(bufferConsumerWithPartialRecordLength);
 
                 checkState(
@@ -376,6 +544,15 @@ public class PipelinedSubpartition extends 
ResultSubpartition
         }
     }
 
+    private void completeTimeoutableCheckpointBarrier(BufferConsumer 
bufferConsumer) {
+        CheckpointBarrier barrier = 
parseAndCheckTimeoutableCheckpointBarrier(bufferConsumer);
+        if (!isChannelStateFutureAvailable(barrier.getId())) {
+            // It happens on a previously aborted checkpoint.
+            return;
+        }
+        completeChannelStateFuture(Collections.emptyList(), null);
+    }
+
     void resumeConsumption() {
         synchronized (buffers) {
             checkState(isBlocked, "Should be blocked by checkpoint.");
@@ -627,4 +804,15 @@ public class PipelinedSubpartition extends 
ResultSubpartition
     BufferConsumerWithPartialRecordLength getNextBuffer() {
         return buffers.poll();
     }
+
+    /** for testing only. */
+    @VisibleForTesting
+    CompletableFuture<List<Buffer>> getChannelStateFuture() {
+        return channelStateFuture;
+    }
+
+    @VisibleForTesting
+    public long getChannelStateCheckpointId() {
+        return channelStateCheckpointId;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index ac99607331d..300d5a41061 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -61,6 +62,10 @@ public abstract class ResultSubpartition {
         parent.onConsumedSubpartition(getSubPartitionIndex());
     }
 
+    public abstract void alignedBarrierTimeout(long checkpointId) throws 
IOException;
+
+    public abstract void abortCheckpoint(long checkpointId, 
CheckpointException cause);
+
     @VisibleForTesting
     public final int add(BufferConsumer bufferConsumer) throws IOException {
         return add(bufferConsumer, 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 190ca452c67..2f10b357877 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.api.EndOfData;
@@ -270,6 +271,16 @@ public class SortMergeResultPartition extends 
ResultPartition {
         }
     }
 
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        // Nothing to do.
+    }
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        // Nothing to do.
+    }
+
     private void broadcast(ByteBuffer record, DataType dataType) throws 
IOException {
         emit(record, 0, dataType, true);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
index 9e01c958259..f425d057c86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
@@ -32,6 +32,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -59,11 +60,14 @@ public class ChannelStateWriteRequestDispatcherTest {
             new Object[] {empty(), asList(start(), completeIn(), 
completeOut())},
             new Object[] {empty(), asList(start(), writeIn(), completeIn())},
             new Object[] {empty(), asList(start(), writeOut(), completeOut())},
+            new Object[] {empty(), asList(start(), writeOutFuture(), 
completeOut())},
             new Object[] {empty(), asList(start(), completeIn(), writeOut())},
+            new Object[] {empty(), asList(start(), completeIn(), 
writeOutFuture())},
             new Object[] {empty(), asList(start(), completeOut(), writeIn())},
             // invalid without start
             new Object[] {of(IllegalArgumentException.class), 
singletonList(writeIn())},
             new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOut())},
+            new Object[] {of(IllegalArgumentException.class), 
singletonList(writeOutFuture())},
             new Object[] {of(IllegalArgumentException.class), 
singletonList(completeIn())},
             new Object[] {of(IllegalArgumentException.class), 
singletonList(completeOut())},
             // invalid double complete
@@ -80,6 +84,9 @@ public class ChannelStateWriteRequestDispatcherTest {
             new Object[] {
                 of(IllegalStateException.class), asList(start(), 
completeOut(), writeOut())
             },
+            new Object[] {
+                of(IllegalStateException.class), asList(start(), 
completeOut(), writeOutFuture())
+            },
             // invalid double start
             new Object[] {of(IllegalStateException.class), asList(start(), 
start())}
         };
@@ -113,6 +120,18 @@ public class ChannelStateWriteRequestDispatcherTest {
                         FreeingBufferRecycler.INSTANCE));
     }
 
+    private static ChannelStateWriteRequest writeOutFuture() {
+        CompletableFuture<List<Buffer>> outFuture = new CompletableFuture<>();
+        ChannelStateWriteRequest writeRequest =
+                write(CHECKPOINT_ID, new ResultSubpartitionInfo(1, 1), 
outFuture);
+        outFuture.complete(
+                singletonList(
+                        new NetworkBuffer(
+                                
MemorySegmentFactory.allocateUnpooledSegment(1),
+                                FreeingBufferRecycler.INSTANCE)));
+        return writeRequest;
+    }
+
     private static CheckpointStartRequest start() {
         return new CheckpointStartRequest(
                 CHECKPOINT_ID,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index b8910c85439..c77208f3ff7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -21,6 +21,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.CloseableIterator;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
 import static org.apache.flink.util.ExceptionUtils.rethrow;
 
 /**
@@ -80,6 +83,22 @@ public class MockChannelStateWriter implements 
ChannelStateWriter {
         }
     }
 
+    @Override
+    public void addOutputDataFuture(
+            long checkpointId,
+            ResultSubpartitionInfo info,
+            int startSeqNum,
+            CompletableFuture<List<Buffer>> data)
+            throws IllegalArgumentException {
+        checkCheckpointId(checkpointId);
+        try {
+            for (final Buffer buffer : data.get()) {
+                buffer.recycleBuffer();
+            }
+        } catch (Exception ignored) {
+        }
+    }
+
     @Override
     public void finishInput(long checkpointId) {
         checkCheckpointId(checkpointId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 9d336cbaf15..181aca1ed62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -236,6 +237,7 @@ public class InputChannelTestUtils {
                                 NoOpFileChannelManager.INSTANCE,
                                 true,
                                 bufferSize);
+        parent.setChannelStateWriter(ChannelStateWriter.NO_OP);
         ResultSubpartition subpartition = parent.getAllPartitions()[0];
         for (BufferConsumer buffer : buffers) {
             subpartition.add(buffer);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index 4a6be76a3ba..25ee8161ba4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -61,6 +62,12 @@ public class MockResultPartitionWriter implements 
ResultPartitionWriter {
     @Override
     public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {}
 
+    @Override
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {}
+
+    @Override
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) 
{}
+
     @Override
     public void notifyEndOfData(StopMode mode) throws IOException {}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 7e2b8a5b245..295b49d75d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -29,7 +34,9 @@ import 
org.apache.flink.runtime.io.network.util.TestConsumerCallback;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.CheckedSupplier;
 
@@ -38,7 +45,10 @@ import org.junit.Assume;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -347,6 +357,139 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
         assertTrue(view.getFailureCause() instanceof CancelTaskException);
     }
 
+    @Test
+    public void testConsumeTimeoutableCheckpointBarrierQuickly() throws 
Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, 
null, true, 0, false);
+
+        // test without data buffer
+        testConsumeQuicklyWithNDataBuffers(0, subpartition, 5L);
+
+        // test with data buffer
+        testConsumeQuicklyWithNDataBuffers(1, subpartition, 6L);
+        testConsumeQuicklyWithNDataBuffers(2, subpartition, 7L);
+    }
+
+    private void testConsumeQuicklyWithNDataBuffers(
+            int numberOfDataBuffers, PipelinedSubpartition subpartition, long 
checkpointId)
+            throws Exception {
+        // add data buffers and barrier
+        for (int i = 0; i < numberOfDataBuffers; i++) {
+            subpartition.add(createFilledFinishedBufferConsumer(4096));
+        }
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+        assertEquals(checkpointId, subpartition.getChannelStateCheckpointId());
+        CompletableFuture<List<Buffer>> channelStateFuture = 
subpartition.getChannelStateFuture();
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(
+                subpartition, channelStateFuture, false, numberOfDataBuffers + 
1, false);
+
+        // poll data buffers first
+        for (int i = 0; i < numberOfDataBuffers; i++) {
+            pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER);
+        }
+        pollBufferAndCheckType(
+                subpartition, 
Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER);
+
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(
+                subpartition, channelStateFuture, true, 0, true);
+        assertTrue(channelStateFuture.get().isEmpty());
+        subpartition.resumeConsumption();
+    }
+
+    @Test
+    public void testTimeoutAlignedToUnalignedBarrier() throws Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(subpartition, 
null, true, 0, false);
+
+        // test without data buffer
+        testTimeoutWithNDataBuffers(0, subpartition, 7L);
+
+        // test with data buffer
+        testTimeoutWithNDataBuffers(1, subpartition, 8L);
+    }
+
+    private void testTimeoutWithNDataBuffers(
+            int numberOfDataBuffers, PipelinedSubpartition subpartition, long 
checkpointId)
+            throws Exception {
+        // put data buffers and barrier
+        List<Buffer> expectedBuffers = new ArrayList<>();
+        for (int i = 0; i < numberOfDataBuffers; i++) {
+            BufferConsumer bufferConsumer = 
createFilledFinishedBufferConsumer(4096);
+            subpartition.add(bufferConsumer);
+            expectedBuffers.add(bufferConsumer.copy().build());
+        }
+        subpartition.add(getTimeoutableBarrierBuffer(checkpointId));
+
+        assertEquals(checkpointId, subpartition.getChannelStateCheckpointId());
+        CompletableFuture<List<Buffer>> channelStateFuture = 
subpartition.getChannelStateFuture();
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(
+                subpartition, channelStateFuture, false, numberOfDataBuffers + 
1, false);
+
+        subpartition.alignedBarrierTimeout(checkpointId);
+        assertSubpartitionChannelStateFuturesAndQueuedBuffers(
+                subpartition, channelStateFuture, true, numberOfDataBuffers + 
1, true);
+
+        pollBufferAndCheckType(subpartition, 
Buffer.DataType.PRIORITIZED_EVENT_BUFFER);
+        for (int i = 0; i < numberOfDataBuffers; i++) {
+            pollBufferAndCheckType(subpartition, Buffer.DataType.DATA_BUFFER);
+        }
+
+        assertEquals(expectedBuffers, channelStateFuture.get());
+    }
+
+    private void pollBufferAndCheckType(
+            PipelinedSubpartition subpartition, Buffer.DataType dataType) {
+        ResultSubpartition.BufferAndBacklog barrierBuffer = 
subpartition.pollBuffer();
+        assertNotNull(barrierBuffer);
+        assertEquals(dataType, barrierBuffer.buffer().getDataType());
+    }
+
+    @Test
+    public void testConcurrentTimeoutableCheckpointBarrier() throws Exception {
+        PipelinedSubpartition subpartition = createSubpartition();
+        subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP);
+
+        subpartition.add(getTimeoutableBarrierBuffer(10L));
+        assertEquals(10L, subpartition.getChannelStateCheckpointId());
+        CompletableFuture<List<Buffer>> checkpointFuture10 = 
subpartition.getChannelStateFuture();
+        assertNotNull(checkpointFuture10);
+
+        try {
+            // It should fail due to currently does not support concurrent 
unaligned checkpoints.
+            subpartition.add(getTimeoutableBarrierBuffer(11L));
+            checkpointFuture10.get();
+            fail("Should fail with an IllegalStateException.");
+        } catch (Throwable e) {
+            ExceptionUtils.assertThrowable(e, IllegalStateException.class);
+        }
+    }
+
+    private BufferConsumer getTimeoutableBarrierBuffer(long checkpointId) 
throws IOException {
+        CheckpointOptions checkpointOptions =
+                CheckpointOptions.alignedWithTimeout(
+                        CheckpointType.CHECKPOINT,
+                        CheckpointStorageLocationReference.getDefault(),
+                        1000);
+        return EventSerializer.toBufferConsumer(
+                new CheckpointBarrier(checkpointId, 
System.currentTimeMillis(), checkpointOptions),
+                false);
+    }
+
+    private void assertSubpartitionChannelStateFuturesAndQueuedBuffers(
+            PipelinedSubpartition subpartition,
+            CompletableFuture<List<Buffer>> channelStateFuture,
+            boolean channelStateFutureIsNull,
+            long numberOfQueuedBuffers,
+            boolean expectedFutureIsDone) {
+        assertEquals(channelStateFutureIsNull, 
subpartition.getChannelStateFuture() == null);
+        assertEquals(numberOfQueuedBuffers, 
subpartition.getNumberOfQueuedBuffers());
+        if (channelStateFuture != null) {
+            assertEquals(expectedFutureIsDone, channelStateFuture.isDone());
+        }
+    }
+
     private void verifyViewReleasedAfterParentRelease(ResultSubpartition 
partition)
             throws Exception {
         // Add a bufferConsumer
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index f9cc4675f39..7306ad3a749 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -53,12 +53,15 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN;
@@ -86,6 +89,7 @@ public class ChannelPersistenceITCase {
     public void testReadWritten() throws Exception {
         byte[] inputChannelInfoData = randomBytes(1024);
         byte[] resultSubpartitionInfoData = randomBytes(1024);
+        byte[] resultSubpartitionInfoFutureData = randomBytes(1024);
         int partitionIndex = 0;
 
         SequentialChannelStateReader reader =
@@ -97,9 +101,12 @@ public class ChannelPersistenceITCase {
                                                 new InputChannelInfo(0, 0), 
inputChannelInfoData),
                                         singletonMap(
                                                 new 
ResultSubpartitionInfo(partitionIndex, 0),
-                                                resultSubpartitionInfoData))));
+                                                resultSubpartitionInfoData),
+                                        singletonMap(
+                                                new 
ResultSubpartitionInfo(partitionIndex, 1),
+                                                
resultSubpartitionInfoFutureData))));
 
-        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 1024);
+        NetworkBufferPool networkBufferPool = new NetworkBufferPool(6, 1024);
         try {
             int numChannels = 1;
             InputGate gate = buildGate(networkBufferPool, numChannels);
@@ -107,12 +114,13 @@ public class ChannelPersistenceITCase {
             assertArrayEquals(
                     inputChannelInfoData, collectBytes(gate::pollNext, 
BufferOrEvent::getBuffer));
 
+            int subpartitions = 2;
             BufferWritingResultPartition resultPartition =
                     buildResultPartition(
                             networkBufferPool,
                             ResultPartitionType.PIPELINED,
                             partitionIndex,
-                            numChannels);
+                            subpartitions);
             reader.readOutputData(new BufferWritingResultPartition[] 
{resultPartition}, false);
             ResultSubpartitionView view =
                     resultPartition.createSubpartitionView(0, new 
NoOpBufferAvailablityListener());
@@ -121,6 +129,13 @@ public class ChannelPersistenceITCase {
                     collectBytes(
                             () -> Optional.ofNullable(view.getNextBuffer()),
                             BufferAndBacklog::buffer));
+            ResultSubpartitionView futureView =
+                    resultPartition.createSubpartitionView(1, new 
NoOpBufferAvailablityListener());
+            assertArrayEquals(
+                    resultSubpartitionInfoFutureData,
+                    collectBytes(
+                            () -> 
Optional.ofNullable(futureView.getNextBuffer()),
+                            BufferAndBacklog::buffer));
         } finally {
             networkBufferPool.destroy();
         }
@@ -224,11 +239,14 @@ public class ChannelPersistenceITCase {
     private ChannelStateWriteResult write(
             long checkpointId,
             Map<InputChannelInfo, byte[]> icMap,
-            Map<ResultSubpartitionInfo, byte[]> rsMap)
+            Map<ResultSubpartitionInfo, byte[]> rsMap,
+            Map<ResultSubpartitionInfo, byte[]> rsFutureMap)
             throws Exception {
-        int maxStateSize = sizeOfBytes(icMap) + sizeOfBytes(rsMap) + 
Long.BYTES * 2;
+        int maxStateSize =
+                sizeOfBytes(icMap) + sizeOfBytes(rsMap) + 
sizeOfBytes(rsFutureMap) + Long.BYTES * 3;
         Map<InputChannelInfo, Buffer> icBuffers = wrapWithBuffers(icMap);
         Map<ResultSubpartitionInfo, Buffer> rsBuffers = wrapWithBuffers(rsMap);
+        Map<ResultSubpartitionInfo, Buffer> rsFutureBuffers = 
wrapWithBuffers(rsFutureMap);
         try (ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl("test", 0, 
getStreamFactoryFactory(maxStateSize))) {
             writer.open();
@@ -244,6 +262,12 @@ public class ChannelPersistenceITCase {
                         ofElements(Buffer::recycleBuffer, e.getValue()));
             }
             writer.finishInput(checkpointId);
+            for (Map.Entry<ResultSubpartitionInfo, Buffer> e : 
rsFutureBuffers.entrySet()) {
+                CompletableFuture<List<Buffer>> dataFuture = new 
CompletableFuture<>();
+                writer.addOutputDataFuture(
+                        checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, 
dataFuture);
+                dataFuture.complete(singletonList(e.getValue()));
+            }
             for (Map.Entry<ResultSubpartitionInfo, Buffer> e : 
rsBuffers.entrySet()) {
                 writer.addOutputData(
                         checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, 
e.getValue());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8a63074815f..93acf23d30b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -158,6 +159,14 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
         recordWriter.broadcastEvent(event, isPriorityEvent);
     }
 
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        recordWriter.alignedBarrierTimeout(checkpointId);
+    }
+
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        recordWriter.abortCheckpoint(checkpointId, cause);
+    }
+
     public void flush() throws IOException {
         recordWriter.flushAll();
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
index f239eac23da..97e148f3c14 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerState.java
@@ -63,6 +63,7 @@ abstract class AbstractAlternatingAlignedBarrierHandlerState 
implements BarrierH
         }
 
         if (controller.allBarriersReceived()) {
+            controller.initInputsCheckpoint(checkpointBarrier);
             controller.triggerGlobalCheckpoint(checkpointBarrier);
             return finishCheckpoint();
         } else if (controller.isTimedOut(checkpointBarrier)) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
index da8a486fb3b..eacd9f58afb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCollectingBarriers.java
@@ -67,6 +67,7 @@ final class AlternatingCollectingBarriers extends 
AbstractAlternatingAlignedBarr
                         + "collecting aligned barrier state");
 
         if (controller.allBarriersReceived()) {
+            controller.initInputsCheckpoint(pendingCheckpointBarrier);
             controller.triggerGlobalCheckpoint(pendingCheckpointBarrier);
             return finishCheckpoint();
         } else if (controller.isTimedOut(pendingCheckpointBarrier)) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 75325c13810..d5f94d42412 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
@@ -335,6 +336,18 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         }
     }
 
+    public void alignedBarrierTimeout(long checkpointId) throws IOException {
+        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+            streamOutput.alignedBarrierTimeout(checkpointId);
+        }
+    }
+
+    public void abortCheckpoint(long checkpointId, CheckpointException cause) {
+        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+            streamOutput.abortCheckpoint(checkpointId, cause);
+        }
+    }
+
     /**
      * Execute {@link StreamOperator#close()} of each operator in the chain of 
this {@link
      * StreamTask}. Closing happens from <b>tail to head</b> operator in the 
chain.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 312f4f38eb3..f431fe90399 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -87,6 +87,7 @@ import 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
 import 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
 import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -440,6 +441,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
             environment.setCheckpointStorageAccess(checkpointStorageAccess);
 
+            // if the clock is not already set, then assign a default 
TimeServiceProvider
+            if (timerService == null) {
+                this.timerService = createTimerService("Time Trigger for " + 
getName());
+            } else {
+                this.timerService = timerService;
+            }
+
+            this.systemTimerService = createTimerService("System Time Trigger 
for " + getName());
+
             this.subtaskCheckpointCoordinator =
                     new SubtaskCheckpointCoordinatorImpl(
                             checkpointStorageAccess,
@@ -455,16 +465,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                     .get(
                                             ExecutionCheckpointingOptions
                                                     
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
-                            this::prepareInputSnapshot);
+                            this::prepareInputSnapshot,
+                            BarrierAlignmentUtil.createRegisterTimerCallback(
+                                    mainMailboxExecutor, systemTimerService));
 
-            // if the clock is not already set, then assign a default 
TimeServiceProvider
-            if (timerService == null) {
-                this.timerService = createTimerService("Time Trigger for " + 
getName());
-            } else {
-                this.timerService = timerService;
-            }
-
-            this.systemTimerService = createTimerService("System Time Trigger 
for " + getName());
             // Register to stop all timers and threads. Should be closed first.
             resourceCloser.registerCloseable(this::tryShutdownTimerService);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 7c96d49207f..76f413c4b49 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -38,9 +38,14 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.Cancellable;
+import 
org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil.DelayableTimer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.slf4j.Logger;
@@ -49,6 +54,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -103,6 +109,19 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
     @GuardedBy("lock")
     private boolean closed;
 
+    private final DelayableTimer registerTimer;
+
+    private final Clock clock;
+
+    /** It always be called in Task Thread. */
+    private Cancellable alignmentTimer;
+
+    /**
+     * It is the checkpointId corresponding to alignmentTimer. And It should 
be always updated with
+     * {@link #alignmentTimer}.
+     */
+    private long alignmentCheckpointId;
+
     SubtaskCheckpointCoordinatorImpl(
             CheckpointStorageWorkerView checkpointStorage,
             String taskName,
@@ -115,7 +134,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             boolean enableCheckpointAfterTasksFinished,
             BiFunctionWithException<
                             ChannelStateWriter, Long, CompletableFuture<Void>, 
CheckpointException>
-                    prepareInputSnapshot)
+                    prepareInputSnapshot,
+            DelayableTimer registerTimer)
             throws IOException {
         this(
                 checkpointStorage,
@@ -128,7 +148,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                 unalignedCheckpointEnabled,
                 enableCheckpointAfterTasksFinished,
                 prepareInputSnapshot,
-                DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS);
+                DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS,
+                registerTimer);
     }
 
     SubtaskCheckpointCoordinatorImpl(
@@ -144,7 +165,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             BiFunctionWithException<
                             ChannelStateWriter, Long, CompletableFuture<Void>, 
CheckpointException>
                     prepareInputSnapshot,
-            int maxRecordAbortedCheckpoints)
+            int maxRecordAbortedCheckpoints,
+            DelayableTimer registerTimer)
             throws IOException {
         this(
                 checkpointStorage,
@@ -159,7 +181,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                 unalignedCheckpointEnabled
                         ? openChannelStateWriter(taskName, checkpointStorage, 
env)
                         : ChannelStateWriter.NO_OP,
-                enableCheckpointAfterTasksFinished);
+                enableCheckpointAfterTasksFinished,
+                registerTimer);
     }
 
     @VisibleForTesting
@@ -176,7 +199,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                     prepareInputSnapshot,
             int maxRecordAbortedCheckpoints,
             ChannelStateWriter channelStateWriter,
-            boolean enableCheckpointAfterTasksFinished)
+            boolean enableCheckpointAfterTasksFinished,
+            DelayableTimer registerTimer)
             throws IOException {
         this.checkpointStorage =
                 new 
CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage));
@@ -195,6 +219,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         closeableRegistry.registerCloseable(this);
         this.closed = false;
         this.enableCheckpointAfterTasksFinished = 
enableCheckpointAfterTasksFinished;
+        this.registerTimer = registerTimer;
+        this.clock = SystemClock.getInstance();
     }
 
     private static ChannelStateWriter openChannelStateWriter(
@@ -229,9 +255,24 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         // notify the coordinator that we decline this checkpoint
         env.declineCheckpoint(checkpointId, cause);
 
-        // notify all downstream operators that they should not wait for a 
barrier from us
         actionExecutor.runThrowing(
-                () -> operatorChain.broadcastEvent(new 
CancelCheckpointMarker(checkpointId)));
+                () -> {
+                    if (checkpointId == alignmentCheckpointId) {
+                        cancelAlignmentTimer();
+                    }
+                    // notify all downstream operators that they should not 
wait for a barrier from
+                    // us and abort checkpoint.
+                    operatorChain.abortCheckpoint(checkpointId, cause);
+                    operatorChain.broadcastEvent(new 
CancelCheckpointMarker(checkpointId));
+                });
+    }
+
+    private void cancelAlignmentTimer() {
+        if (alignmentTimer == null) {
+            return;
+        }
+        alignmentTimer.cancel();
+        alignmentTimer = null;
     }
 
     @Override
@@ -300,17 +341,26 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
 
         // Step (2): Send the checkpoint barrier downstream
-        operatorChain.broadcastEvent(
-                new CheckpointBarrier(metadata.getCheckpointId(), 
metadata.getTimestamp(), options),
-                options.isUnalignedCheckpoint());
-
-        // Step (3): Prepare to spill the in-flight buffers for input and 
output
-        if (options.isUnalignedCheckpoint()) {
+        LOG.debug(
+                "Task {} broadcastEvent at {}, triggerTime {}, passed time {}",
+                taskName,
+                System.currentTimeMillis(),
+                metadata.getTimestamp(),
+                System.currentTimeMillis() - metadata.getTimestamp());
+        CheckpointBarrier checkpointBarrier =
+                new CheckpointBarrier(metadata.getCheckpointId(), 
metadata.getTimestamp(), options);
+        operatorChain.broadcastEvent(checkpointBarrier, 
options.isUnalignedCheckpoint());
+
+        // Step (3): Register alignment timer to timeout aligned barrier to 
unaligned barrier
+        registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, 
checkpointBarrier);
+
+        // Step (4): Prepare to spill the in-flight buffers for input and 
output
+        if (options.needsChannelState()) {
             // output data already written while broadcasting event
             channelStateWriter.finishOutput(metadata.getCheckpointId());
         }
 
-        // Step (4): Take the state snapshot. This should be largely 
asynchronous, to not impact
+        // Step (5): Take the state snapshot. This should be largely 
asynchronous, to not impact
         // progress of the
         // streaming topology
 
@@ -335,6 +385,33 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         }
     }
 
+    private void registerAlignmentTimer(
+            long checkpointId,
+            OperatorChain<?, ?> operatorChain,
+            CheckpointBarrier checkpointBarrier) {
+        // The timer isn't triggered when the checkpoint completes quickly, so 
cancel timer here.
+        cancelAlignmentTimer();
+        if (!checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
+            return;
+        }
+
+        long timerDelay = BarrierAlignmentUtil.getTimerDelay(clock, 
checkpointBarrier);
+
+        alignmentTimer =
+                registerTimer.registerTask(
+                        () -> {
+                            try {
+                                
operatorChain.alignedBarrierTimeout(checkpointId);
+                            } catch (Exception e) {
+                                ExceptionUtils.rethrowIOException(e);
+                            }
+                            alignmentTimer = null;
+                            return null;
+                        },
+                        Duration.ofMillis(timerDelay));
+        alignmentCheckpointId = checkpointId;
+    }
+
     @Override
     public void notifyCheckpointComplete(
             long checkpointId, OperatorChain<?, ?> operatorChain, 
Supplier<Boolean> isRunning)
@@ -439,6 +516,10 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             channelStateWriter.start(id, checkpointOptions);
 
             prepareInflightDataSnapshot(id);
+        } else if (checkpointOptions.isTimeoutable()) {
+            // The output buffer may need to be snapshotted, so start the 
channelStateWriter here.
+            channelStateWriter.start(id, checkpointOptions);
+            channelStateWriter.finishInput(id);
         }
     }
 
@@ -635,7 +716,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         long started = System.nanoTime();
 
         ChannelStateWriteResult channelStateWriteResult =
-                checkpointOptions.isUnalignedCheckpoint()
+                checkpointOptions.needsChannelState()
                         ? 
channelStateWriter.getAndRemoveWriteResult(checkpointId)
                         : ChannelStateWriteResult.EMPTY;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
index 650edf5abe9..1855df80cbe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
@@ -112,7 +112,8 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
                 unalignedCheckpointEnabled,
                 enableCheckpointAfterTasksFinished,
                 prepareInputSnapshot,
-                maxRecordAbortedCheckpoints);
+                maxRecordAbortedCheckpoints,
+                (callable, duration) -> () -> {});
     }
 
     private static class NonHandleAsyncException implements 
AsyncExceptionHandler {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index ef37551a02f..135799b8e44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -72,6 +72,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -79,11 +80,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -511,6 +514,61 @@ public class SubtaskCheckpointCoordinatorTest {
         }
     }
 
+    @Test
+    public void 
testTimeoutableAlignedBarrierNotPriorityAndChannelStateResult() throws 
Exception {
+        long checkpointId = 66;
+        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+
+        try (SubtaskCheckpointCoordinator coordinator =
+                new MockSubtaskCheckpointCoordinatorBuilder()
+                        .setUnalignedCheckpointEnabled(true)
+                        .setEnvironment(mockEnvironment)
+                        .build()) {
+            AtomicReference<Boolean> broadcastedPriorityEvent = new 
AtomicReference<>(null);
+            AtomicReference<ChannelStateWriter.ChannelStateWriteResult> 
channelStateResult =
+                    new AtomicReference<>(null);
+            final OperatorChain<?, ?> operatorChain =
+                    new RegularOperatorChain(
+                            new NoOpStreamTask<>(new DummyEnvironment()), new 
NonRecordWriter<>()) {
+                        @Override
+                        public void broadcastEvent(AbstractEvent event, 
boolean isPriorityEvent)
+                                throws IOException {
+                            super.broadcastEvent(event, isPriorityEvent);
+                            broadcastedPriorityEvent.set(isPriorityEvent);
+                        }
+
+                        @Override
+                        public void snapshotState(
+                                Map operatorSnapshotsInProgress,
+                                CheckpointMetaData checkpointMetaData,
+                                CheckpointOptions checkpointOptions,
+                                Supplier isRunning,
+                                ChannelStateWriter.ChannelStateWriteResult 
channelStateWriteResult,
+                                CheckpointStreamFactory storage) {
+                            channelStateResult.set(channelStateWriteResult);
+                        }
+                    };
+
+            CheckpointOptions checkpointOptions =
+                    new CheckpointOptions(
+                            CHECKPOINT,
+                            CheckpointStorageLocationReference.getDefault(),
+                            CheckpointOptions.AlignmentType.ALIGNED,
+                            200);
+            coordinator.initInputsCheckpoint(checkpointId, checkpointOptions);
+            coordinator.checkpointState(
+                    new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                    checkpointOptions,
+                    new CheckpointMetricsBuilder(),
+                    operatorChain,
+                    false,
+                    () -> true);
+
+            assertEquals(false, broadcastedPriorityEvent.get());
+            assertNotNull(channelStateResult.get());
+        }
+    }
+
     private OperatorChain<?, ?> getOperatorChain(MockEnvironment 
mockEnvironment) throws Exception {
         return new RegularOperatorChain<>(
                 new MockStreamTaskBuilder(mockEnvironment).build(), new 
NonRecordWriter<>());
@@ -686,6 +744,7 @@ public class SubtaskCheckpointCoordinatorTest {
                 (unused1, unused2) -> CompletableFuture.completedFuture(null),
                 0,
                 channelStateWriter,
-                true);
+                true,
+                (callable, duration) -> () -> {});
     }
 }

Reply via email to