This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa731288518c8ebf66f40b4e0e9b1929546b6257
Author: Yun Tang <[email protected]>
AuthorDate: Mon May 11 13:45:45 2020 +0800

    [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on 
task side
---
 .../connectors/fs/bucketing/BucketingSink.java     |   4 +
 .../connectors/gcp/pubsub/PubSubSource.java        |   4 +
 .../gcp/pubsub/common/AcknowledgeOnCheckpoint.java |   4 +
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |   4 +
 .../connectors/kafka/KafkaConsumerTestBase.java    |   4 +
 .../connectors/kafka/KafkaProducerTestBase.java    |   4 +
 .../kafka/testutils/FailingIdentityMapper.java     |   4 +
 .../connectors/kafka/testutils/IntegerSource.java  |   4 +
 .../flink/streaming/tests/FailureMapper.java       |   4 +
 .../HeavyDeploymentStressTestProgram.java          |   4 +
 .../StickyAllocationAndLocalRecoveryTestJob.java   |   4 +
 .../api/runtime/SavepointTaskStateManager.java     |   5 +
 .../flink/state/api/output/SnapshotUtilsTest.java  |   4 +
 .../itcases/AbstractQueryableStateTestBase.java    |   4 +
 .../runtime/jobgraph/tasks/AbstractInvokable.java  |  12 +
 .../flink/runtime/state/CheckpointListener.java    |   8 +
 .../runtime/state/NoOpTaskLocalStateStoreImpl.java |   4 +
 .../flink/runtime/state/TaskLocalStateStore.java   |   6 +
 .../runtime/state/TaskLocalStateStoreImpl.java     |   9 +
 .../flink/runtime/state/TaskStateManagerImpl.java  |  10 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |   5 +
 .../org/apache/flink/runtime/taskmanager/Task.java |  28 ++
 .../runtime/state/TaskLocalStateStoreImplTest.java |  23 +-
 .../runtime/state/TestTaskLocalStateStore.java     |  19 ++
 .../flink/runtime/state/TestTaskStateManager.java  |  11 +
 .../state/ttl/mock/MockKeyedStateBackend.java      |   5 +
 .../streaming/state/RocksDBKeyedStateBackend.java  |   7 +
 .../state/snapshot/RocksFullSnapshotStrategy.java  |   5 +
 .../snapshot/RocksIncrementalSnapshotStrategy.java |   7 +
 .../functions/sink/TwoPhaseCommitSinkFunction.java |   4 +
 .../sink/filesystem/StreamingFileSink.java         |   4 +
 .../source/MessageAcknowledgingSourceBase.java     |   4 +
 .../api/operators/AbstractStreamOperator.java      |   5 +
 .../api/operators/AbstractStreamOperatorV2.java    |   5 +
 .../api/operators/StreamOperatorStateHandler.java  |   6 +
 .../api/operators/collect/CollectSinkFunction.java |   4 +
 .../runtime/tasks/AsyncCheckpointRunnable.java     |  22 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |   7 +
 .../tasks/SubtaskCheckpointCoordinator.java        |  12 +
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 198 +++++++++++++-
 .../AbstractUdfStreamOperatorLifecycleTest.java    |   1 +
 .../runtime/tasks/LocalStateForwardingTest.java    |   3 +-
 .../MockSubtaskCheckpointCoordinatorBuilder.java   |  14 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    | 297 ++++++++++++++++++++-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +
 .../runtime/utils/FailingCollectionSource.java     |   4 +
 .../runtime/stream/FsStreamingSinkITCaseBase.scala |   3 +
 .../flink/streaming/util/FiniteTestSource.java     |   4 +
 .../jobmaster/JobMasterStopWithSavepointIT.java    |   5 +
 .../jobmaster/JobMasterTriggerSavepointITCase.java |   5 +
 .../checkpointing/CoStreamCheckpointingITCase.java |   4 +
 .../ContinuousFileProcessingCheckpointITCase.java  |   4 +
 .../KeyedStateCheckpointingITCase.java             |   4 +
 .../checkpointing/StateCheckpointedITCase.java     |   4 +
 .../StreamCheckpointNotifierITCase.java            |  20 ++
 .../checkpointing/UnalignedCheckpointITCase.java   |   8 +
 .../ZooKeeperHighAvailabilityITCase.java           |   4 +
 .../utils/AccumulatingIntegerSink.java             |   4 +
 .../utils/CancellingIntegerSource.java             |   4 +
 .../test/checkpointing/utils/FailingSource.java    |   4 +
 .../jar/CheckpointedStreamingProgram.java          |   4 +
 .../jar/CheckpointingCustomKvStateProgram.java     |   4 +
 .../ReinterpretDataStreamAsKeyedStreamITCase.java  |   4 +
 63 files changed, 877 insertions(+), 24 deletions(-)

diff --git 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index ec14cce..78cefaf 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -733,6 +733,10 @@ public class BucketingSink<T>
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
                Preconditions.checkNotNull(restoredBucketStates, "The operator 
has not been properly initialized.");
 
diff --git 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
index 1472bb2..8007fc1 100644
--- 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
+++ 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
@@ -194,6 +194,10 @@ public class PubSubSource<OUT> extends 
RichSourceFunction<OUT>
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public List<AcknowledgeIdsForCheckpoint<String>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
                return acknowledgeOnCheckpoint.snapshotState(checkpointId, 
timestamp);
        }
diff --git 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java
 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java
index f538b69..6b194b6 100644
--- 
a/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java
+++ 
b/flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.java
@@ -83,6 +83,10 @@ public class AcknowledgeOnCheckpoint<ACKID extends 
Serializable> implements Chec
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
                acknowledgeIdsPerCheckpoint.add(new 
AcknowledgeIdsForCheckpoint<>(checkpointId, 
acknowledgeIdsForPendingCheckpoint));
                acknowledgeIdsForPendingCheckpoint = new ArrayList<>();
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 84057b0..733011f 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1035,6 +1035,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
        // 
------------------------------------------------------------------------
        //  Kafka Consumer specific methods
        // 
------------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 95688b2..d99f8d9 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -2250,6 +2250,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBaseWithFlink {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
                        return Collections.singletonList(this.numElementsTotal);
                }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index c0c9844..2b586fe 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -531,6 +531,10 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBaseWithFlink {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        if (!triggeredShutdown) {
                                lastSnapshotedElementBeforeShutdown = 
numElementsTotal;
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index bd412c9..9919f1e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -95,6 +95,10 @@ public class FailingIdentityMapper<T> extends 
RichMapFunction<T, T> implements
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
                return Collections.singletonList(numElementsTotal);
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
index 25a3cea..f471df4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
@@ -127,4 +127,8 @@ public class IntegerSource
                        blocker.notifyAll();
                }
        }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
 }
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
index a3a1c25..458f9b2 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/FailureMapper.java
@@ -71,6 +71,10 @@ public class FailureMapper<T> extends RichMapFunction<T, T> 
implements Checkpoin
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
        private boolean isReachedFailureThreshold() {
                return numProcessedRecords >= 
numProcessedRecordsFailureThreshold
                        && numCompleteCheckpoints >= 
numCompleteCheckpointsFailureThreshold
diff --git 
a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
 
b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
index d65583f..68b6d24 100644
--- 
a/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-heavy-deployment-stress-test/src/main/java/org/apache/flink/deployment/HeavyDeploymentStressTestProgram.java
@@ -146,5 +146,9 @@ public class HeavyDeploymentStressTestProgram {
                public void notifyCheckpointComplete(long checkpointId) {
                        readyToFail = true;
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 }
diff --git 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
index b03791e..990baa9 100644
--- 
a/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
+++ 
b/flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
@@ -378,6 +378,10 @@ public class StickyAllocationAndLocalRecoveryTestJob {
                        failTask = currentSchedulingAndFailureInfo.failingTask;
                }
 
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
                private boolean shouldTaskFailForThisAttempt() {
                        RuntimeContext runtimeContext = getRuntimeContext();
                        int numSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index 9563e40..4fb15a2 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -78,6 +78,11 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               throw new UnsupportedOperationException(MSG);
+       }
+
+       @Override
        public void close() {
        }
 }
diff --git 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 5668284..36b4d63 100644
--- 
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++ 
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -131,6 +131,10 @@ public class SnapshotUtilsTest {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public void setCurrentKey(Object key) {
                        ACTUAL_ORDER_TRACKING.add("setCurrentKey");
                }
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 83444e8..d9e3c91 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -1124,6 +1124,10 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                LATEST_CHECKPOINT_ID.set(checkpointId);
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 72586ab..af57f97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -278,6 +278,18 @@ public abstract class AbstractInvokable {
                throw new 
UnsupportedOperationException(String.format("notifyCheckpointCompleteAsync not 
supported by %s", this.getClass().getName()));
        }
 
+       /**
+        * Invoked when a checkpoint has been aborted, i.e., when the 
checkpoint coordinator has received a decline message
+        * from one task and try to abort the targeted checkpoint by 
notification.
+        *
+        * @param checkpointId The ID of the checkpoint that is aborted.
+        *
+        * @return future that completes when the notification has been 
processed by the task.
+        */
+       public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+               throw new 
UnsupportedOperationException(String.format("notifyCheckpointAbortAsync not 
supported by %s", this.getClass().getName()));
+       }
+
        public void dispatchOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEvent> event) throws FlinkException {
                throw new UnsupportedOperationException("dispatchOperatorEvent 
not supported by " + getClass().getName());
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
index 0c99316..13c8e39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -38,4 +38,12 @@ public interface CheckpointListener {
         * @throws Exception
         */
        void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been aborted.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        * @throws Exception
+        */
+       void notifyCheckpointAborted(long checkpointId) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
index 11841a1..aece4aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/NoOpTaskLocalStateStoreImpl.java
@@ -66,6 +66,10 @@ public final class NoOpTaskLocalStateStoreImpl implements 
OwnedTaskLocalStateSto
        }
 
        @Override
+       public void abortCheckpoint(long abortedCheckpointId) {
+       }
+
+       @Override
        public void pruneMatchingCheckpoints(LongPredicate matcher) {
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index b0d8a82..78f5068 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -68,6 +68,12 @@ public interface TaskLocalStateStore {
        void confirmCheckpoint(long confirmedCheckpointId);
 
        /**
+        * Notifies that the checkpoint with the given id was confirmed as 
aborted. This prunes the checkpoint history
+        * and removes states with a checkpoint id that is equal to the newly 
aborted checkpoint id.
+        */
+       void abortCheckpoint(long abortedCheckpointId);
+
+       /**
         * Remove all checkpoints from the store that match the given predicate.
         * @param matcher the predicate that selects the checkpoints for 
pruning.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index a57a7ef..52d7811 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -225,6 +225,15 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
        }
 
        @Override
+       public void abortCheckpoint(long abortedCheckpointId) {
+
+               LOG.debug("Received abort information for checkpoint {} in 
subtask ({} - {} - {}). Starting to prune history.",
+                       abortedCheckpointId, jobID, jobVertexID, subtaskIndex);
+
+               pruneCheckpoints(snapshotCheckpointId -> snapshotCheckpointId 
== abortedCheckpointId, false);
+       }
+
+       @Override
        public void pruneMatchingCheckpoints(@Nonnull LongPredicate matcher) {
 
                pruneCheckpoints(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index a42e808..f2ecde0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -180,13 +180,21 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
        }
 
        /**
-        * Tracking when local state can be disposed.
+        * Tracking when local state can be confirmed and disposed.
         */
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                localStateStore.confirmCheckpoint(checkpointId);
        }
 
+       /**
+        * Tracking when some local state can be disposed.
+        */
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               localStateStore.abortCheckpoint(checkpointId);
+       }
+
        @Override
        public void close() throws Exception {
                channelStateReader.close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 7e184ab..68828ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -312,6 +312,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // nothing to do
+       }
+
+       @Override
        public <N, S extends State, T> void applyToAllKeys(
                final N namespace,
                final TypeSerializer<N> namespaceSerializer,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2e3c9b6..bd4671f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1218,6 +1218,34 @@ public class Task implements Runnable, TaskSlotPayload, 
TaskActions, PartitionPr
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(final long checkpointID) {
+               final AbstractInvokable invokable = this.invokable;
+
+               if (executionState == ExecutionState.RUNNING && invokable != 
null) {
+                       try {
+                               
invokable.notifyCheckpointAbortAsync(checkpointID);
+                       }
+                       catch (RejectedExecutionException ex) {
+                               // This may happen if the mailbox is closed. It 
means that the task is shutting down, so we just ignore it.
+                               LOG.debug(
+                                       "Notify checkpoint abort {} for {} ({}) 
was rejected by the mailbox",
+                                       checkpointID, taskNameWithSubtask, 
executionId);
+                       }
+                       catch (Throwable t) {
+                               if (getExecutionState() == 
ExecutionState.RUNNING) {
+                                       // fail task if checkpoint aborted 
notification failed.
+                                       failExternally(new RuntimeException(
+                                               "Error while aborting 
checkpoint",
+                                               t));
+                               }
+                       }
+               }
+               else {
+                       LOG.info("Ignoring checkpoint aborted notification for 
non-running task {}.", taskNameWithSubtask);
+               }
+       }
+
        /**
         * Dispatches an operator event to the invokable task.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 7531783..784015b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -154,6 +154,21 @@ public class TaskLocalStateStoreImplTest {
        }
 
        /**
+        * Tests pruning of target previous checkpoints if that checkpoint is 
aborted.
+        */
+       @Test
+       public void abortCheckpoint() throws Exception {
+
+               final int chkCount = 4;
+               final int aborted = chkCount - 2;
+               List<TaskStateSnapshot> taskStateSnapshots = 
storeStates(chkCount);
+               taskLocalStateStore.abortCheckpoint(aborted);
+               checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 
1);
+               checkStoredAsExpected(taskStateSnapshots, 0, aborted);
+               checkStoredAsExpected(taskStateSnapshots, aborted + 1, 
chkCount);
+       }
+
+       /**
         * Tests that disposal of a {@link TaskLocalStateStoreImpl} works and 
discards all local states.
         */
        @Test
@@ -167,16 +182,16 @@ public class TaskLocalStateStoreImplTest {
                checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount);
        }
 
-       private void checkStoredAsExpected(List<TaskStateSnapshot> history, int 
off, int len) throws Exception {
-               for (int i = off; i < len; ++i) {
+       private void checkStoredAsExpected(List<TaskStateSnapshot> history, int 
start, int end) throws Exception {
+               for (int i = start; i < end; ++i) {
                        TaskStateSnapshot expected = history.get(i);
                        Assert.assertTrue(expected == 
taskLocalStateStore.retrieveLocalState(i));
                        Mockito.verify(expected, 
Mockito.never()).discardState();
                }
        }
 
-       private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, 
int off, int len) throws Exception {
-               for (int i = off; i < len; ++i) {
+       private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, 
int start, int end) throws Exception {
+               for (int i = start; i < end; ++i) {
                        
Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
                        Mockito.verify(history.get(i)).discardState();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
index 2ade3e6..e92a34a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
@@ -105,6 +105,25 @@ public class TestTaskLocalStateStore implements 
TaskLocalStateStore {
        }
 
        @Override
+       public void abortCheckpoint(long abortedCheckpointId) {
+               Preconditions.checkState(!disposed);
+               Iterator<Map.Entry<Long, TaskStateSnapshot>> iterator = 
taskStateSnapshotsByCheckpointID.entrySet().iterator();
+               while (iterator.hasNext()) {
+                       Map.Entry<Long, TaskStateSnapshot> entry = 
iterator.next();
+                       if (entry.getKey() == abortedCheckpointId) {
+                               iterator.remove();
+                               try {
+                                       entry.getValue().discardState();
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       } else if (entry.getKey() > abortedCheckpointId){
+                               break;
+                       }
+               }
+       }
+
+       @Override
        public void pruneMatchingCheckpoints(LongPredicate matcher) {
                
taskStateSnapshotsByCheckpointID.keySet().removeIf(matcher::test);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 1ce41b3..ae6c022 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -47,6 +47,7 @@ public class TestTaskStateManager implements TaskStateManager 
{
 
        private long reportedCheckpointId;
        private long notifiedCompletedCheckpointId;
+       private long notifiedAbortedCheckpointId;
 
        private JobID jobId;
        private ExecutionAttemptID executionAttemptID;
@@ -88,6 +89,7 @@ public class TestTaskStateManager implements TaskStateManager 
{
                this.taskManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
                this.reportedCheckpointId = -1L;
                this.notifiedCompletedCheckpointId = -1L;
+               this.notifiedAbortedCheckpointId = -1L;
        }
 
        @Override
@@ -175,6 +177,11 @@ public class TestTaskStateManager implements 
TaskStateManager {
                this.notifiedCompletedCheckpointId = checkpointId;
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               this.notifiedAbortedCheckpointId = checkpointId;
+       }
+
        public JobID getJobId() {
                return jobId;
        }
@@ -227,6 +234,10 @@ public class TestTaskStateManager implements 
TaskStateManager {
                return notifiedCompletedCheckpointId;
        }
 
+       public long getNotifiedAbortedCheckpointId() {
+               return notifiedAbortedCheckpointId;
+       }
+
        public void setReportedCheckpointId(long reportedCheckpointId) {
                this.reportedCheckpointId = reportedCheckpointId;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 5c82b80..5cb6866 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -167,6 +167,11 @@ public class MockKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // noop
+       }
+
+       @Override
        public <N> Stream<K> getKeys(String state, N namespace) {
                return stateValues.get(state).entrySet().stream()
                        .filter(e -> e.getValue().containsKey(namespace))
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 2ddb79b..61d8688 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -464,6 +464,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               
checkpointSnapshotStrategy.notifyCheckpointAborted(checkpointId);
+
+               savepointSnapshotStrategy.notifyCheckpointAborted(checkpointId);
+       }
+
        /**
         * Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
         *
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
index 1085283..751eed8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -150,6 +150,11 @@ public class RocksFullSnapshotStrategy<K> extends 
RocksDBSnapshotStrategyBase<K>
                // nothing to do.
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // nothing to do.
+       }
+
        private SupplierWithException<CheckpointStreamWithResultProvider, 
Exception> createCheckpointStreamSupplier(
                long checkpointId,
                CheckpointStreamFactory primaryStreamFactory,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 23f4574..77b1342 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -179,6 +179,13 @@ public class RocksIncrementalSnapshotStrategy<K> extends 
RocksDBSnapshotStrategy
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long abortedCheckpointId) {
+               synchronized (materializedSstFiles) {
+                       
materializedSstFiles.keySet().remove(abortedCheckpointId);
+               }
+       }
+
        @Nonnull
        private SnapshotDirectory prepareLocalSnapshotDirectory(long 
checkpointId) throws IOException {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index 6a42fb9..361bf3f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -305,6 +305,10 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, 
CONTEXT>
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
                // this is like the pre-commit of a 2-phase-commit transaction
                // we are ready to commit and remember the transaction
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 54abac4..0962799 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -424,6 +424,10 @@ public class StreamingFileSink<IN>
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
                Preconditions.checkState(helper != null, "sink has not been 
initialized");
                this.helper.snapshotState(context.getCheckpointId());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index ffb2015..3a2a5ca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -241,4 +241,8 @@ public abstract class MessageAcknowledgingSourceBase<Type, 
UId>
                        }
                }
        }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a698254..a249d5e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -344,6 +344,11 @@ public abstract class AbstractStreamOperator<OUT>
                stateHandler.notifyCheckpointComplete(checkpointId);
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               stateHandler.notifyCheckpointAborted(checkpointId);
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties and Services
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 56533db..819b254 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -290,6 +290,11 @@ public abstract class AbstractStreamOperatorV2<OUT> 
implements StreamOperator<OU
                stateHandler.notifyCheckpointComplete(checkpointId);
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               stateHandler.notifyCheckpointAborted(checkpointId);
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties and Services
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
index 99123b1..ed03907 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
@@ -222,6 +222,12 @@ public class StreamOperatorStateHandler {
                }
        }
 
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               if (keyedStateBackend != null) {
+                       keyedStateBackend.notifyCheckpointAborted(checkpointId);
+               }
+       }
+
        @SuppressWarnings("unchecked")
        public <K> KeyedStateBackend<K> getKeyedStateBackend() {
                return (KeyedStateBackend<K>) keyedStateBackend;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
index 64a3ffe..a687283a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java
@@ -287,6 +287,10 @@ public class CollectSinkFunction<IN> extends 
RichSinkFunction<IN> implements Che
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
        public void setOperatorEventGateway(OperatorEventGateway eventGateway) {
                this.eventGateway = eventGateway;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index b09c5a9..e89e962 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -36,6 +35,7 @@ import java.io.Closeable;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,10 +46,11 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
 
        public static final Logger LOG = 
LoggerFactory.getLogger(AsyncCheckpointRunnable.class);
        private final String taskName;
-       private final CloseableRegistry closeableRegistry;
+       private final Consumer<AsyncCheckpointRunnable> registerConsumer;
+       private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
        private final Environment taskEnvironment;
 
-       private enum AsyncCheckpointState {
+       enum AsyncCheckpointState {
                RUNNING,
                DISCARDED,
                COMPLETED
@@ -70,7 +71,8 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                        Future<?> channelWrittenFuture,
                        long asyncStartNanos,
                        String taskName,
-                       CloseableRegistry closeableRegistry,
+                       Consumer<AsyncCheckpointRunnable> register,
+                       Consumer<AsyncCheckpointRunnable> unregister,
                        Environment taskEnvironment,
                        AsyncExceptionHandler asyncExceptionHandler) {
 
@@ -80,7 +82,8 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                this.channelWrittenFuture = checkNotNull(channelWrittenFuture);
                this.asyncStartNanos = asyncStartNanos;
                this.taskName = checkNotNull(taskName);
-               this.closeableRegistry = checkNotNull(closeableRegistry);
+               this.registerConsumer = register;
+               this.unregisterConsumer = unregister;
                this.taskEnvironment = checkNotNull(taskEnvironment);
                this.asyncExceptionHandler = 
checkNotNull(asyncExceptionHandler);
        }
@@ -89,7 +92,7 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
        public void run() {
                FileSystemSafetyNet.initializeSafetyNetForThread();
                try {
-                       closeableRegistry.registerCloseable(this);
+                       registerConsumer.accept(this);
 
                        TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = 
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                        TaskStateSnapshot localTaskOperatorSubtaskStates = new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
@@ -140,7 +143,7 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                        }
                        handleExecutionException(e);
                } finally {
-                       closeableRegistry.unregisterCloseable(this);
+                       unregisterConsumer.accept(this);
                        
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                }
        }
@@ -229,6 +232,10 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                }
        }
 
+       long getCheckpointId() {
+               return checkpointMetaData.getCheckpointId();
+       }
+
        private void cleanup() throws Exception {
                LOG.debug(
                        "Cleanup AsyncCheckpointRunnable for checkpoint {} of 
{}.",
@@ -259,4 +266,5 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                        taskName,
                        checkpointMetaData.getCheckpointId());
        }
+
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 0bcbc30..1c346b1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -933,6 +933,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
+       @Override
+       public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
+               return 
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).submit(
+                       () -> 
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, this::isRunning),
+                       "checkpoint %d aborted", checkpointId);
+       }
+
        private void tryShutdownTimerService() {
 
                if (!timerService.isTerminated()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
index d7352c6..2922735 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
@@ -65,4 +65,16 @@ interface SubtaskCheckpointCoordinator extends Closeable {
                long checkpointId,
                OperatorChain<?, ?> operatorChain,
                Supplier<Boolean> isRunning) throws Exception;
+
+       /**
+        * Notified on the task side once a distributed checkpoint has been 
aborted.
+        *
+        * @param checkpointId The checkpoint id to notify as been completed.
+        * @param operatorChain The chain of operators executed by the task.
+        * @param isRunning Whether the task is running.
+        */
+       void notifyCheckpointAborted(
+               long checkpointId,
+               OperatorChain<?, ?> operatorChain,
+               Supplier<Boolean> isRunning) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 8a55aa6..7508a16 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
@@ -41,19 +42,31 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
@@ -62,10 +75,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator 
{
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
+       private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
        private final CachingCheckpointStorageWorkerView checkpointStorage;
        private final String taskName;
-       private final CloseableRegistry closeableRegistry;
        private final ExecutorService executorService;
        private final Environment env;
        private final AsyncExceptionHandler asyncExceptionHandler;
@@ -73,6 +86,19 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
        private final StreamTaskActionExecutor actionExecutor;
        private final boolean unalignedCheckpointEnabled;
        private final BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot;
+       /** The IDs of the checkpoint for which we are notified aborted. */
+       private final Set<Long> abortedCheckpointIds;
+       private long lastCheckpointId;
+
+       /** Lock that guards state of AsyncCheckpointRunnable registry. **/
+       private final Object lock;
+
+       @GuardedBy("lock")
+       private final Map<Long, AsyncCheckpointRunnable> checkpoints;
+
+       /** Indicates if this registry is closed. */
+       @GuardedBy("lock")
+       private boolean closed;
 
        SubtaskCheckpointCoordinatorImpl(
                        CheckpointStorageWorkerView checkpointStorage,
@@ -84,9 +110,34 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        AsyncExceptionHandler asyncExceptionHandler,
                        boolean unalignedCheckpointEnabled,
                        BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot) throws IOException {
+               this(checkpointStorage,
+                       taskName,
+                       actionExecutor,
+                       closeableRegistry,
+                       executorService,
+                       env,
+                       asyncExceptionHandler,
+                       unalignedCheckpointEnabled,
+                       prepareInputSnapshot,
+                       DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS);
+       }
+
+       @VisibleForTesting
+       SubtaskCheckpointCoordinatorImpl(
+                       CheckpointStorageWorkerView checkpointStorage,
+                       String taskName,
+                       StreamTaskActionExecutor actionExecutor,
+                       CloseableRegistry closeableRegistry,
+                       ExecutorService executorService,
+                       Environment env,
+                       AsyncExceptionHandler asyncExceptionHandler,
+                       boolean unalignedCheckpointEnabled,
+                       BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot,
+                       int maxRecordAbortedCheckpoints) throws IOException {
                this.checkpointStorage = new 
CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage));
                this.taskName = checkNotNull(taskName);
-               this.closeableRegistry = checkNotNull(closeableRegistry);
+               this.checkpoints = new HashMap<>();
+               this.lock = new Object();
                this.executorService = checkNotNull(executorService);
                this.env = checkNotNull(env);
                this.asyncExceptionHandler = 
checkNotNull(asyncExceptionHandler);
@@ -94,7 +145,10 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                this.channelStateWriter = unalignedCheckpointEnabled ? 
openChannelStateWriter() : ChannelStateWriter.NO_OP;
                this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
                this.prepareInputSnapshot = prepareInputSnapshot;
-               this.closeableRegistry.registerCloseable(this);
+               this.abortedCheckpointIds = 
createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
+               this.lastCheckpointId = -1L;
+               closeableRegistry.registerCloseable(this);
+               this.closed = false;
        }
 
        private ChannelStateWriter openChannelStateWriter() {
@@ -144,6 +198,15 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                // We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
                // checkpoint alignments
 
+               // Step (0): Record the last triggered checkpointId.
+               Preconditions.checkArgument(lastCheckpointId < 
metadata.getCheckpointId(), String.format(
+                       "Unexpected current checkpoint-id: %s vs last 
checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId));
+               lastCheckpointId = metadata.getCheckpointId();
+               if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+                       LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());
+                       return;
+               }
+
                // Step (1): Prepare the checkpoint, allow operators to do some 
pre-barrier work.
                //           The pre-barrier work should be nothing or minimal 
in the common case.
                
operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
@@ -188,6 +251,106 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                
env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId, OperatorChain<?, 
?> operatorChain, Supplier<Boolean> isRunning) throws Exception {
+
+               Exception previousException = null;
+               if (isRunning.get()) {
+                       LOG.debug("Notification of aborted checkpoint for task 
{}", taskName);
+
+                       boolean canceled = 
cancelAsyncCheckpointRunnable(checkpointId);
+
+                       if (!canceled) {
+                               if (checkpointId > lastCheckpointId) {
+                                       // only record checkpoints that have 
not triggered on task side.
+                                       abortedCheckpointIds.add(checkpointId);
+                               }
+                       }
+
+                       for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
+                               try {
+                                       
operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);
+                               } catch (Exception e) {
+                                       previousException = e;
+                               }
+                       }
+
+               } else {
+                       LOG.debug("Ignoring notification of aborted checkpoint 
for not-running task {}", taskName);
+               }
+
+               env.getTaskStateManager().notifyCheckpointAborted(checkpointId);
+               ExceptionUtils.tryRethrowException(previousException);
+       }
+
+       @Override
+       public void close() throws IOException {
+               List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
+               synchronized (lock) {
+                       if (!closed) {
+                               closed = true;
+                               asyncCheckpointRunnables = new 
ArrayList<>(checkpoints.values());
+                               checkpoints.clear();
+                       }
+               }
+               IOUtils.closeAllQuietly(asyncCheckpointRunnables);
+               channelStateWriter.close();
+       }
+
+       @VisibleForTesting
+       int getAsyncCheckpointRunnableSize() {
+               synchronized (lock) {
+                       return checkpoints.size();
+               }
+       }
+
+       @VisibleForTesting
+       int getAbortedCheckpointSize() {
+               return abortedCheckpointIds.size();
+       }
+
+       private boolean checkAndClearAbortedStatus(long checkpointId) {
+               return abortedCheckpointIds.remove(checkpointId);
+       }
+
+       private void registerAsyncCheckpointRunnable(long checkpointId, 
AsyncCheckpointRunnable asyncCheckpointRunnable) throws IOException {
+               StringBuilder exceptionMessage = new StringBuilder("Cannot 
register Closeable, ");
+               synchronized (lock) {
+                       if (!closed) {
+                               if (!checkpoints.containsKey(checkpointId)) {
+                                       checkpoints.put(checkpointId, 
asyncCheckpointRunnable);
+                                       return;
+                               } else {
+                                       exceptionMessage.append("async 
checkpoint ").append(checkpointId).append(" runnable has been register. ");
+                               }
+                       } else {
+                               exceptionMessage.append("this 
subtaskCheckpointCoordinator is already closed. ");
+                       }
+               }
+
+               IOUtils.closeQuietly(asyncCheckpointRunnable);
+               throw new IOException(exceptionMessage.append("Closing 
argument.").toString());
+       }
+
+       private boolean unregisterAsyncCheckpointRunnable(long checkpointId) {
+               synchronized (lock) {
+                       return checkpoints.remove(checkpointId) != null;
+               }
+       }
+
+       /**
+        * Cancel the async checkpoint runnable with given checkpoint id.
+        * If given checkpoint id is not registered, return false, otherwise 
return true.
+        */
+       private boolean cancelAsyncCheckpointRunnable(long checkpointId) {
+               AsyncCheckpointRunnable asyncCheckpointRunnable;
+               synchronized (lock) {
+                       asyncCheckpointRunnable = 
checkpoints.remove(checkpointId);
+               }
+               IOUtils.closeQuietly(asyncCheckpointRunnable);
+               return asyncCheckpointRunnable != null;
+       }
+
        private void cleanup(
                        Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress,
                        CheckpointMetaData metadata,
@@ -251,11 +414,26 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        channelWrittenFuture,
                        System.nanoTime(),
                        taskName,
-                       closeableRegistry,
+                       registerConsumer(),
+                       unregisterConsumer(),
                        env,
                        asyncExceptionHandler));
        }
 
+       private Consumer<AsyncCheckpointRunnable> registerConsumer() {
+               return asyncCheckpointRunnable -> {
+                       try {
+                               
registerAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId(), 
asyncCheckpointRunnable);
+                       } catch (IOException e) {
+                               throw new UncheckedIOException(e);
+                       }
+               };
+       }
+
+       private Consumer<AsyncCheckpointRunnable> unregisterConsumer() {
+               return asyncCheckpointRunnable -> 
unregisterAsyncCheckpointRunnable(asyncCheckpointRunnable.getCheckpointId());
+       }
+
        private boolean takeSnapshotSync(
                        Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress,
                        CheckpointMetaData checkpointMetaData,
@@ -342,9 +520,15 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                return snapshotInProgress;
        }
 
-       @Override
-       public void close() throws IOException {
-               channelStateWriter.close();
+       private Set<Long> createAbortedCheckpointSetWithLimitSize(int 
maxRecordAbortedCheckpoints) {
+               return Collections.newSetFromMap(new LinkedHashMap<Long, 
Boolean>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       protected boolean removeEldestEntry(Map.Entry<Long, 
Boolean> eldest) {
+                               return size() > maxRecordAbortedCheckpoints;
+                       }
+               });
        }
 
        // Caches checkpoint output stream factories to prevent multiple output 
stream per checkpoint.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 5fb8830..46dfbde9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -93,6 +93,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                        "getMetricGroup[], " +
                        "getOperatorID[], " +
                        "initializeState[interface 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer], " +
+                       "notifyCheckpointAborted[long], " +
                        "notifyCheckpointComplete[long], " +
                        "open[], " +
                        "prepareSnapshotPreBarrier[long], " +
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 8d4fae2..3e0703f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -118,7 +118,8 @@ public class LocalStateForwardingTest extends TestLogger {
                        CompletableFuture.completedFuture(null),
                        0L,
                        testStreamTask.getName(),
-                       testStreamTask.getCancelables(),
+                       asyncCheckpointRunnable -> {},
+                       asyncCheckpointRunnable -> {},
                        testStreamTask.getEnvironment(),
                        testStreamTask);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
index d3898db..e0db33f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java
@@ -47,6 +47,7 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
        private ExecutorService executorService = 
Executors.newDirectExecutorService();
        private BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot = 
(channelStateWriter, aLong) -> FutureUtils.completedVoidFuture();
        private boolean unalignedCheckpointEnabled;
+       private int maxRecordAbortedCheckpoints = 10;
 
        public MockSubtaskCheckpointCoordinatorBuilder 
setEnvironment(Environment environment) {
                this.environment = environment;
@@ -58,6 +59,16 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
                return this;
        }
 
+       public MockSubtaskCheckpointCoordinatorBuilder 
setExecutor(ExecutorService executor) {
+               this.executorService = executor;
+               return this;
+       }
+
+       public MockSubtaskCheckpointCoordinatorBuilder 
setMaxRecordAbortedCheckpoints(int maxRecordAbortedCheckpoints) {
+               this.maxRecordAbortedCheckpoints = maxRecordAbortedCheckpoints;
+               return this;
+       }
+
        public MockSubtaskCheckpointCoordinatorBuilder 
setUnalignedCheckpointEnabled(boolean unalignedCheckpointEnabled) {
                this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
                return this;
@@ -83,7 +94,8 @@ public class MockSubtaskCheckpointCoordinatorBuilder {
                        environment,
                        asyncExceptionHandler,
                        unalignedCheckpointEnabled,
-                       prepareInputSnapshot);
+                       prepareInputSnapshot,
+                       maxRecordAbortedCheckpoints);
        }
 
        private static class NonHandleAsyncException implements 
AsyncExceptionHandler {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index efe9185..94bb92f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,21 +18,45 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -48,9 +72,7 @@ public class SubtaskCheckpointCoordinatorTest {
                        .setEnvironment(mockEnvironment)
                        .build();
 
-               final OperatorChain<?, ?> operatorChain = new OperatorChain<>(
-                       new MockStreamTaskBuilder(new 
DummyEnvironment()).build(),
-                       new NonRecordWriter<>());
+               final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
 
                long checkpointId = 42L;
                {
@@ -82,4 +104,273 @@ public class SubtaskCheckpointCoordinatorTest {
                        new OperatorChain<>(new NoOpStreamTask<>(new 
DummyEnvironment()), new NonRecordWriter<>()),
                        () -> false);
        }
+
+       @Test
+       public void testNotifyCheckpointAbortedManyTimes() throws Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               int maxRecordAbortedCheckpoints = 256;
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       
.setMaxRecordAbortedCheckpoints(maxRecordAbortedCheckpoints)
+                       .build();
+
+               final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
+
+               long notifyAbortedTimes = maxRecordAbortedCheckpoints + 42;
+               for (int i = 1; i < notifyAbortedTimes; i++) {
+                       subtaskCheckpointCoordinator.notifyCheckpointAborted(i, 
operatorChain, () -> true);
+                       assertEquals(Math.min(maxRecordAbortedCheckpoints, i), 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               }
+       }
+
+       @Test
+       public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws 
Exception {
+               TestTaskStateManager stateManager = new TestTaskStateManager();
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().setTaskStateManager(stateManager).build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .setUnalignedCheckpointEnabled(true)
+                       .build();
+
+               CheckpointOperator checkpointOperator = new 
CheckpointOperator(new OperatorSnapshotFutures());
+
+               final OperatorChain<String, AbstractStreamOperator<String>> 
operatorChain = operatorChain(checkpointOperator);
+
+               long checkpointId = 42L;
+               // notify checkpoint aborted before execution.
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               assertEquals(1, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+
+               
subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+               assertFalse(checkpointOperator.isCheckpointed());
+               assertEquals(-1, stateManager.getReportedCheckpointId());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
+       }
+
+       @Test
+       public void testNotifyCheckpointAbortedDuringAsyncPhase() throws 
Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .setExecutor(Executors.newSingleThreadExecutor())
+                       .setUnalignedCheckpointEnabled(true)
+                       .build();
+
+               final BlockingRunnableFuture rawKeyedStateHandleFuture = new 
BlockingRunnableFuture();
+               OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+                       DoneFuture.of(SnapshotResult.empty()),
+                       rawKeyedStateHandleFuture,
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()));
+
+               final OperatorChain<String, AbstractStreamOperator<String>> 
operatorChain = operatorChain(new CheckpointOperator(operatorSnapshotResult));
+
+               long checkpointId = 42L;
+               
subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+               rawKeyedStateHandleFuture.awaitRun();
+               assertEquals(1, 
subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
+               assertFalse(rawKeyedStateHandleFuture.isCancelled());
+
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               assertTrue(rawKeyedStateHandleFuture.isCancelled());
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
+       }
+
+       @Test
+       public void testNotifyCheckpointAbortedAfterAsyncPhase() throws 
Exception {
+               TestTaskStateManager stateManager = new TestTaskStateManager();
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().setTaskStateManager(stateManager).build();
+               SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               final OperatorChain<?, ?> operatorChain = 
getOperatorChain(mockEnvironment);
+
+               long checkpointId = 42L;
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               assertEquals(0, 
subtaskCheckpointCoordinator.getAbortedCheckpointSize());
+               assertEquals(checkpointId, 
stateManager.getNotifiedAbortedCheckpointId());
+       }
+
+       private OperatorChain<?, ?> getOperatorChain(MockEnvironment 
mockEnvironment) throws Exception {
+               return new OperatorChain<>(
+                       new MockStreamTaskBuilder(mockEnvironment).build(),
+                       new NonRecordWriter<>());
+       }
+
+       private <T> OperatorChain<T, AbstractStreamOperator<T>> 
operatorChain(OneInputStreamOperator<T, T>... streamOperators) throws Exception 
{
+               return OperatorChainTest.setupOperatorChain(streamOperators);
+       }
+
+       private static final class BlockingRunnableFuture implements 
RunnableFuture<SnapshotResult<KeyedStateHandle>> {
+
+               private final 
CompletableFuture<SnapshotResult<KeyedStateHandle>> future = new 
CompletableFuture<>();
+
+               private final OneShotLatch signalRunLatch = new OneShotLatch();
+
+               private final CountDownLatch countDownLatch;
+
+               private final SnapshotResult<KeyedStateHandle> value;
+
+               private BlockingRunnableFuture() {
+                       // count down twice to wait for notify checkpoint 
aborted to cancel.
+                       this.countDownLatch = new CountDownLatch(2);
+                       this.value = SnapshotResult.empty();
+               }
+
+               @Override
+               public void run() {
+                       signalRunLatch.trigger();
+                       countDownLatch.countDown();
+
+                       try {
+                               countDownLatch.await();
+                       } catch (InterruptedException e) {
+                               ExceptionUtils.rethrow(e);
+                       }
+
+                       future.complete(value);
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       future.cancel(mayInterruptIfRunning);
+                       return true;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return future.isCancelled();
+               }
+
+               @Override
+               public boolean isDone() {
+                       return future.isDone();
+               }
+
+               @Override
+               public SnapshotResult<KeyedStateHandle> get() throws 
InterruptedException, ExecutionException {
+                       return future.get();
+               }
+
+               @Override
+               public SnapshotResult<KeyedStateHandle> get(long timeout, 
TimeUnit unit) throws InterruptedException, ExecutionException {
+                       return future.get();
+               }
+
+               void awaitRun() throws InterruptedException {
+                       signalRunLatch.await();
+               }
+       }
+
+       private static class CheckpointOperator implements 
OneInputStreamOperator<String, String> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final OperatorSnapshotFutures operatorSnapshotFutures;
+
+               private boolean checkpointed = false;
+
+               CheckpointOperator(OperatorSnapshotFutures 
operatorSnapshotFutures) {
+                       this.operatorSnapshotFutures = operatorSnapshotFutures;
+               }
+
+               boolean isCheckpointed() {
+                       return checkpointed;
+               }
+
+               @Override
+               public void open() throws Exception {
+               }
+
+               @Override
+               public void close() throws Exception {
+               }
+
+               @Override
+               public void dispose() {
+               }
+
+               @Override
+               public void prepareSnapshotPreBarrier(long checkpointId) {
+               }
+
+               @Override
+               public OperatorSnapshotFutures snapshotState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory 
storageLocation) throws Exception {
+                       this.checkpointed = true;
+                       return operatorSnapshotFutures;
+               }
+
+               @Override
+               public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager) throws Exception {
+               }
+
+               @Override
+               public void setKeyContextElement1(StreamRecord<?> record) {
+               }
+
+               @Override
+               public void setKeyContextElement2(StreamRecord<?> record) {
+               }
+
+               @Override
+               public MetricGroup getMetricGroup() {
+                       return null;
+               }
+
+               @Override
+               public OperatorID getOperatorID() {
+                       return new OperatorID();
+               }
+
+               @Override
+               public void notifyCheckpointComplete(long checkpointId) {
+               }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
+               public void setCurrentKey(Object key) {
+               }
+
+               @Override
+               public Object getCurrentKey() {
+                       return null;
+               }
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+               }
+
+               @Override
+               public void processWatermark(Watermark mark) throws Exception {
+               }
+
+               @Override
+               public void processLatencyMarker(LatencyMarker latencyMarker) {
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index ca5ee61..eff8c64 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -68,6 +68,7 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -175,6 +176,11 @@ public class SynchronousCheckpointITCase {
                }
 
                @Override
+               public Future<Void> notifyCheckpointAbortAsync(long 
checkpointId) {
+                       return CompletableFuture.completedFuture(null);
+               }
+
+               @Override
                protected void init() {
                }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java
index 28d92b9..df3a568 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/FailingCollectionSource.java
@@ -245,6 +245,10 @@ public class FailingCollectionSource<T>
                lastCheckpointedEmittedNum = 
checkpointedEmittedNums.get(checkpointId);
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
        public static void reset() {
                failedBefore = false;
        }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
index 56b7d21..02b70b4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
@@ -176,4 +176,7 @@ class FiniteTestSource(elements: Iterable[Row]) extends 
SourceFunction[Row] with
   override def notifyCheckpointComplete(checkpointId: Long): Unit = {
     numCheckpointsComplete += 1
   }
+
+  @throws[Exception]
+  override def notifyCheckpointAborted(checkpointId: Long): Unit = {}
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
index b3a9546..c85aaaa 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
@@ -91,4 +91,8 @@ public class FiniteTestSource<T> implements 
SourceFunction<T>, CheckpointListene
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                numCheckpointsComplete++;
        }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
index 12d186f..42c889c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java
@@ -352,6 +352,11 @@ public class JobMasterStopWithSavepointIT extends 
AbstractTestBase {
 
                        return 
super.notifyCheckpointCompleteAsync(checkpointId);
                }
+
+               @Override
+               public Future<Void> notifyCheckpointAbortAsync(long 
checkpointId) {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index 4193694..8605623 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -251,6 +251,11 @@ public class JobMasterTriggerSavepointITCase extends 
AbstractTestBase {
                public Future<Void> notifyCheckpointCompleteAsync(final long 
checkpointId) {
                        return CompletableFuture.completedFuture(null);
                }
+
+               @Override
+               public Future<Void> notifyCheckpointAbortAsync(long 
checkpointId) {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 
        private String cancelWithSavepoint() throws Exception {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
index b207de8..cf49d5a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CoStreamCheckpointingITCase.java
@@ -227,6 +227,10 @@ public class CoStreamCheckpointingITCase extends 
AbstractTestBase {
                        }
                }
 
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
                private static String randomString(StringBuilder bld, Random 
rnd) {
                        final int len = rnd.nextInt(10) + 5;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 612ed43..fa6a0d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -247,6 +247,10 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                        this.successfulCheckpoints++;
                }
 
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
                private int getFileIdx(String line) {
                        String[] tkns = line.split(":");
                        return Integer.parseInt(tkns[0]);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index e7f9921..b2895e6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -265,6 +265,10 @@ public class KeyedStateCheckpointingITCase extends 
TestLogger {
                                this.notifyAll();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        private static class OnceFailingPartitionedSum
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
index 0fcfb8f..a243843 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java
@@ -364,6 +364,10 @@ public class StateCheckpointedITCase extends 
StreamFaultToleranceTestBase {
                public void notifyCheckpointComplete(long checkpointId) {
                        this.wasCheckpointed = true;
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        private static class ValidatingSink extends 
RichSinkFunction<PrefixCount>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 7b058a0..ece4f62 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -250,6 +250,10 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBase {
                                
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
@@ -281,6 +285,10 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBase {
                                
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
@@ -312,6 +320,10 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBase {
                                
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
@@ -349,6 +361,10 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBase {
                                
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
@@ -414,5 +430,9 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBase {
                                
GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index b8f3f16..47e7007 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -248,6 +248,10 @@ public class UnalignedCheckpointITCase extends TestLogger {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public void run(SourceContext<Long> ctx) throws Exception {
                        int increment = 
getRuntimeContext().getNumberOfParallelSubtasks();
                        info("First emitted input {}", state.nextNumber);
@@ -446,6 +450,10 @@ public class UnalignedCheckpointITCase extends TestLogger {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        checkFail(failDuringSnapshot, "snapshotState");
                        listState.clear();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 5758f76..81df28f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -422,6 +422,10 @@ public class ZooKeeperHighAvailabilityITCase extends 
TestLogger {
                                
checkpointCompletedIncludingData.compareAndSet(false, true);
                        }
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java
index a4b66d6..4fd967c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.java
@@ -75,6 +75,10 @@ public class AccumulatingIntegerSink extends 
RichSinkFunction<Integer> implement
                
pendingForAccumulator.remove(checkpointId).forEach(accumulator::add);
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
        @SuppressWarnings("unchecked")
        public static List<Integer> getOutput(Map<String, Object> accumulators) 
{
                return (List<Integer>) accumulators.get(ACCUMULATOR_NAME);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java
index 1f49c2e..93ad11b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.java
@@ -112,6 +112,10 @@ public class CancellingIntegerSource extends 
RichSourceFunction<Integer> impleme
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public void cancel() {
                isCanceled = true;
        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
index ff49959..82ca91e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
@@ -135,6 +135,10 @@ public class FailingSource extends 
RichSourceFunction<Tuple2<Long, IntType>>
        }
 
        @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+       }
+
+       @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
                // We accept a checkpoint as basis if it should have a "decent 
amount" of state
                if (emitCallCount > failureAfterNumElements / 2) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
index 234c473..f1fae2c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java
@@ -123,6 +123,10 @@ public class CheckpointedStreamingProgram {
                public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                        atLeastOneSnapshotComplete = true;
                }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
        }
        // 
--------------------------------------------------------------------------------------------
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index 0d5f2d7..ca178c7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -166,6 +166,10 @@ public class CheckpointingCustomKvStateProgram {
                        atLeastOneSnapshotComplete = true;
                }
 
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
                private static class ReduceSum implements 
ReduceFunction<Integer> {
                        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
index db03220..29c0346 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
@@ -257,6 +257,10 @@ public class ReinterpretDataStreamAsKeyedStreamITCase {
                }
 
                @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+               }
+
+               @Override
                public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
                        positionState.clear();
                        positionState.add(position);

Reply via email to