This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9dce8f0dcf68492240933a74dbce760ae25ce07f
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sun Oct 25 10:07:16 2020 +0800

    Backport of "[FLINK-19698][connector/common] Let 
SplitEnumerator/SourceReader/SourceCoordinator implement CheckpointListener."
    
    The backport does not move the 'CheckpointListener' interface, to not break 
this between minor versions.
    Instead, it lets the Reader and Enumerator offer the same methods directly. 
This is compatible with inheriting
    these methods in 1.12 (and 1.12 adds the interface to the class inheritance 
signature, which is also non-breaking).
---
 .../base/source/reader/SourceReaderBase.java       |  2 +-
 .../base/source/reader/SourceReaderBaseTest.java   |  6 ++-
 .../base/source/reader/SourceReaderTestBase.java   |  2 +-
 .../base/source/reader/mocks/MockSourceReader.java |  5 ++
 .../flink/api/connector/source/SourceReader.java   | 19 +++++++-
 .../api/connector/source/SplitEnumerator.java      | 17 ++++++-
 .../source/lib/util/IteratorSourceReader.java      |  2 +-
 .../source/lib/NumberSequenceSourceTest.java       |  2 +-
 .../connector/source/mocks/MockSourceReader.java   | 22 ++++++++-
 .../source/mocks/MockSplitEnumerator.java          | 21 +++++++--
 .../runtime/checkpoint/CheckpointCoordinator.java  |  7 ++-
 .../OperatorCoordinatorCheckpointContext.java      | 19 +++++++-
 .../coordination/OperatorCoordinator.java          | 40 +++++++---------
 .../coordination/OperatorCoordinatorHolder.java    | 18 +++++---
 .../RecreateOnResetOperatorCoordinator.java        |  4 +-
 .../source/coordinator/SourceCoordinator.java      | 22 +++++++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 53 ++++++++++++++++++++++
 .../CheckpointCoordinatorTestingUtils.java         | 19 +++++++-
 .../CoordinatorEventsExactlyOnceITCase.java        |  2 +-
 .../coordination/MockOperatorCoordinator.java      |  2 +-
 .../OperatorCoordinatorHolderTest.java             |  2 +-
 .../coordination/TestingOperatorCoordinator.java   |  2 +-
 .../source/coordinator/SourceCoordinatorTest.java  | 12 ++++-
 .../streaming/api/operators/SourceOperator.java    | 17 ++++++-
 .../collect/CollectSinkOperatorCoordinator.java    |  2 +-
 .../api/operators/SourceOperatorTest.java          | 53 +++++++++++++++++++++-
 .../source/SourceOperatorEventTimeTest.java        |  2 +-
 .../operators/source/TestingSourceOperator.java    | 13 ++++++
 28 files changed, 321 insertions(+), 66 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index e1da654..2c6c1812 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -204,7 +204,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        }
 
        @Override
-       public List<SplitT> snapshotState() {
+       public List<SplitT> snapshotState(long checkpointId) {
                List<SplitT> splits = new ArrayList<>();
                splitStates.forEach((id, context) -> splits.add(toSplitType(id, 
context.state)));
                return splits;
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 6396b2e..da057da 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -185,8 +185,10 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        new TestingReaderContext()) {
 
                        @Override
-                       protected void onSplitFinished(Collection<String> 
finishedSplitIds) {
-                       }
+                       public void notifyCheckpointComplete(long checkpointId) 
throws Exception {}
+
+                       @Override
+                       protected void onSplitFinished(Collection<String> 
finishedSplitIds) {}
 
                        @Override
                        protected TestingSourceSplit 
initializedState(TestingSourceSplit split) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 22dda52..015cd89 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -132,7 +132,7 @@ public abstract class SourceReaderTestBase<SplitT extends 
SourceSplit> extends T
                List<SplitT> splits = getSplits(NUM_SPLITS, 
NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED);
                try (SourceReader<Integer, SplitT> reader =
                                consumeRecords(splits, output, NUM_SPLITS * 
NUM_RECORDS_PER_SPLIT)) {
-                       List<SplitT> state = reader.snapshotState();
+                       List<SplitT> state = reader.snapshotState(1L);
                        assertEquals("The snapshot should only have 10 splits. 
", NUM_SPLITS, state.size());
                        for (int i = 0; i < NUM_SPLITS; i++) {
                                assertEquals("The first four splits should have 
been fully consumed.",
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 66022db..9de22aa 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -57,4 +57,9 @@ public class MockSourceReader
        protected MockSourceSplit toSplitType(String splitId, AtomicInteger 
splitState) {
                return new MockSourceSplit(Integer.parseInt(splitId), 
splitState.get());
        }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 3e623dd..0aac1b1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -32,7 +32,8 @@ import java.util.concurrent.CompletableFuture;
  * @param <SplitT> The type of the the source splits.
  */
 @PublicEvolving
-public interface SourceReader<T, SplitT extends SourceSplit> extends 
AutoCloseable {
+public interface SourceReader<T, SplitT extends SourceSplit>
+               extends AutoCloseable {
 
        /**
         * Start the reader.
@@ -58,7 +59,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> 
extends AutoCloseab
         *
         * @return the state of the source.
         */
-       List<SplitT> snapshotState();
+       List<SplitT> snapshotState(long checkpointId);
 
        /**
         * @return a future that will be completed once there is a record 
available to poll.
@@ -78,4 +79,18 @@ public interface SourceReader<T, SplitT extends SourceSplit> 
extends AutoCloseab
         * @param sourceEvent the event sent by the {@link SplitEnumerator}.
         */
        void handleSourceEvents(SourceEvent sourceEvent);
+
+       /**
+        * We have an empty default implementation here because most source 
readers do not have
+        * to implement the method.
+        */
+       default void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
+
+       /**
+        * Called when a checkpoint is aborted.
+        *
+        * <p>NOTE: This method is here only in the backport to the Flink 1.11 
branch.
+        * In 1.12, this default method is inherited from the 
CheckpointListener interface.
+        */
+       default void notifyCheckpointAborted(long checkpointId) {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index 1e429bb..e97f5e1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -29,7 +29,8 @@ import java.util.List;
  * 2. assign the splits to the source reader.
  */
 @PublicEvolving
-public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> 
extends AutoCloseable {
+public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>
+               extends AutoCloseable {
 
        /**
         * Start the split enumerator.
@@ -76,4 +77,18 @@ public interface SplitEnumerator<SplitT extends SourceSplit, 
CheckpointT> extend
         */
        @Override
        void close() throws IOException;
+
+       /**
+        * We have an empty default implementation here because most source 
readers do not have
+        * to implement the method.
+        */
+       default void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
+
+       /**
+        * Called when a checkpoint is aborted.
+        *
+        * <p>NOTE: This method is here only in the backport to the Flink 1.11 
branch.
+        * In 1.12, this default method is inherited from the 
CheckpointListener interface.
+        */
+       default void notifyCheckpointAborted(long checkpointId) {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 54ddb35..3af75e1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -122,7 +122,7 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        }
 
        @Override
-       public List<SplitT> snapshotState() {
+       public List<SplitT> snapshotState(long checkpointId) {
                final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
                if (iterator != null) {
                        @SuppressWarnings("unchecked")
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index 450983e..48cda2d 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -54,7 +54,7 @@ public class NumberSequenceSourceTest {
                        if (--remainingInCycle <= 0) {
                                remainingInCycle = elementsPerCycle;
                                // checkpoint
-                               List<NumberSequenceSource.NumberSequenceSplit> 
splits = reader.snapshotState();
+                               List<NumberSequenceSource.NumberSequenceSplit> 
splits = reader.snapshotState(1L);
 
                                // re-create and restore
                                reader = createReader();
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index e48f827..b0a24dd 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -35,6 +35,8 @@ import java.util.concurrent.CompletableFuture;
 public class MockSourceReader implements SourceReader<Integer, 
MockSourceSplit> {
        private final List<MockSourceSplit> assignedSplits = new ArrayList<>();
        private final List<SourceEvent> receivedSourceEvents = new 
ArrayList<>();
+       private final List<Long> completedCheckpoints = new ArrayList<>();
+       private final List<Long> abortedCheckpoints = new ArrayList<>();
        private final boolean markIdleOnNoSplits;
 
        private int currentSplitIndex = 0;
@@ -90,7 +92,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        }
 
        @Override
-       public List<MockSourceSplit> snapshotState() {
+       public List<MockSourceSplit> snapshotState(long checkpointId) {
                return assignedSplits;
        }
 
@@ -119,6 +121,16 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                this.closed = true;
        }
 
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) {
+               completedCheckpoints.add(checkpointId);
+       }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               abortedCheckpoints.add(checkpointId);
+       }
+
        private synchronized void markUnavailable() {
                if (availableFuture.isDone()) {
                        availableFuture = new CompletableFuture<>();
@@ -155,6 +167,14 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                return receivedSourceEvents;
        }
 
+       public List<Long> getCompletedCheckpoints() {
+               return completedCheckpoints;
+       }
+
+       public List<Long> getAbortedCheckpoints() {
+               return abortedCheckpoints;
+       }
+
        /**
         * Simple event allowing {@link MockSourceReader} to finish when 
requested.
         */
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 93f6c77..0525533 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -39,11 +39,12 @@ import java.util.TreeSet;
  * A mock {@link SplitEnumerator} for unit tests.
  */
 public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, 
Set<MockSourceSplit>> {
-       private SortedSet<MockSourceSplit> unassignedSplits;
-       private SplitEnumeratorContext<MockSourceSplit> enumContext;
-       private List<SourceEvent> handledSourceEvent;
-       private boolean started;
-       private boolean closed;
+       private final SortedSet<MockSourceSplit> unassignedSplits;
+       private final SplitEnumeratorContext<MockSourceSplit> enumContext;
+       private final List<SourceEvent> handledSourceEvent;
+       private final List<Long> successfulCheckpoints;
+       private volatile boolean started;
+       private volatile boolean closed;
 
        public MockSplitEnumerator(int numSplits, 
SplitEnumeratorContext<MockSourceSplit> enumContext) {
                this(new HashSet<>(), enumContext);
@@ -59,6 +60,7 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
                this.unassignedSplits.addAll(unassignedSplits);
                this.enumContext = enumContext;
                this.handledSourceEvent = new ArrayList<>();
+               this.successfulCheckpoints = new ArrayList<>();
                this.started = false;
                this.closed = false;
        }
@@ -96,6 +98,11 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
        }
 
        @Override
+       public void notifyCheckpointComplete(long checkpointId) {
+               successfulCheckpoints.add(checkpointId);
+       }
+
+       @Override
        public void close() throws IOException {
                this.closed = true;
        }
@@ -123,6 +130,10 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
                return handledSourceEvent;
        }
 
+       public List<Long> getSuccessfulCheckpoints() {
+               return successfulCheckpoints;
+       }
+
        // --------------------
 
        private void assignAllSplits() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 26cf391..0e61a11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1144,7 +1144,7 @@ public class CheckpointCoordinator {
 
                // commit coordinators
                for (OperatorCoordinatorCheckpointContext coordinatorContext : 
coordinatorsToCheckpoint) {
-                       coordinatorContext.checkpointComplete(checkpointId);
+                       
coordinatorContext.notifyCheckpointComplete(checkpointId);
                }
        }
 
@@ -1159,6 +1159,11 @@ public class CheckpointCoordinator {
                                }
                        }
                });
+
+               // commit coordinators
+               for (OperatorCoordinatorCheckpointContext coordinatorContext : 
coordinatorsToCheckpoint) {
+                       
coordinatorContext.notifyCheckpointAborted(checkpointId);
+               }
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
index 94d4e07..f5f72d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorInfo;
+import org.apache.flink.runtime.state.CheckpointListener;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -27,7 +28,7 @@ import java.util.concurrent.CompletableFuture;
  * This context is the interface through which the {@link 
CheckpointCoordinator} interacts with an
  * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, 
CheckpointListener {
 
        void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
result) throws Exception;
 
@@ -35,7 +36,21 @@ public interface OperatorCoordinatorCheckpointContext 
extends OperatorInfo {
 
        void abortCurrentTriggering();
 
-       void checkpointComplete(long checkpointId);
+       /**
+        * We override the method here to remove the checked exception. Please 
check the
+        * Java docs of {@link 
CheckpointListener#notifyCheckpointComplete(long)} for more
+        * detail semantic of the method.
+        */
+       @Override
+       void notifyCheckpointComplete(long checkpointId);
+
+       /**
+        * We override the method here to remove the checked exception. Please 
check the
+        * Java docs of {@link 
CheckpointListener#notifyCheckpointAborted(long)} for more
+        * detail semantic of the method.
+        */
+       @Override
+       default void notifyCheckpointAborted(long checkpointId) {}
 
        void resetToCheckpoint(byte[] checkpointData) throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index fff88c7..bc280f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.state.CheckpointListener;
 
 import javax.annotation.Nullable;
 
@@ -43,7 +44,7 @@ import java.util.concurrent.CompletableFuture;
  * The methods on the {@link Context} are safe to be called from another 
thread than the thread that
  * calls the Coordinator's methods.
  */
-public interface OperatorCoordinator extends AutoCloseable {
+public interface OperatorCoordinator extends CheckpointListener, AutoCloseable 
{
 
        /**
         * Starts the coordinator. This method is called once at the beginning, 
before any other methods.
@@ -102,27 +103,20 @@ public interface OperatorCoordinator extends 
AutoCloseable {
        void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
resultFuture) throws Exception;
 
        /**
-        * Notifies the coordinator that the checkpoint with the given 
checkpointId completes and
-        * was committed.
-        *
-        * <h3>Checkpoint Subsuming</h3>
-        *
-        * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher 
ID always subsumes
-        * a checkpoint with lower ID. For example, when checkpoint T is 
confirmed complete, the
-        * code should treat all checkpoints with lower ID (T-1, T-2, etc.) 
also as confirmed.
-        *
-        * <h3>Exceptions</h3>
-        *
-        * <p>This method is not supposed to throw an exception indicating the 
the checkpoint cannot
-        * be completed. By the time we notify that the checkpoint is complete, 
the checkpoint is
-        * committed and cannot be aborted any more.
-        *
-        * <p>If the coordinator gets into an inconsistent state internally, as 
a result of logic that
-        * runs after this notification, it should fail the job ({@link 
Context#failJob(Throwable)})
-        * instead. Any exception propagating from this method may be treated 
as a fatal error for the
-        * JobManager, crashing the JobManager, and leading to an expensive 
"master failover" procedure.
+        * We override the method here to remove the checked exception. Please 
check the
+        * Java docs of {@link 
CheckpointListener#notifyCheckpointComplete(long)} for more
+        * detail semantic of the method.
         */
-       void checkpointComplete(long checkpointId);
+       @Override
+       void notifyCheckpointComplete(long checkpointId);
+
+       /**
+        * We override the method here to remove the checked exception. Please 
check the
+        * Java docs of {@link 
CheckpointListener#notifyCheckpointAborted(long)} for more
+        * detail semantic of the method.
+        */
+       @Override
+       default void notifyCheckpointAborted(long checkpointId) {}
 
        /**
         * Resets the coordinator to the given checkpoint.
@@ -139,9 +133,9 @@ public interface OperatorCoordinator extends AutoCloseable {
         * It is safe to commit side-effects that are predicated on checkpoint 
completion after this
         * call.
         *
-        * <p>Even if no call to {@link #checkpointComplete(long)} happened, 
the checkpoint can still be
+        * <p>Even if no call to {@link #notifyCheckpointComplete(long)} 
happened, the checkpoint can still be
         * complete (for example when a system failure happened directly after 
committing the checkpoint,
-        * before calling the {@link #checkpointComplete(long)} method).
+        * before calling the {@link #notifyCheckpointComplete(long)} method).
         */
        void resetToCheckpoint(byte[] checkpointData) throws Exception;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 4e4e8f5..9819754 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -208,12 +208,21 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
                // unfortunately, this method does not run in the scheduler 
executor, but in the
                // checkpoint coordinator time thread.
                // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
                // main thread executor
-               mainThreadExecutor.execute(() -> 
checkpointCompleteInternal(checkpointId));
+               mainThreadExecutor.execute(() -> 
coordinator.notifyCheckpointComplete(checkpointId));
+       }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // unfortunately, this method does not run in the scheduler 
executor, but in the
+               // checkpoint coordinator time thread.
+               // we can remove the delegation once the checkpoint coordinator 
runs fully in the scheduler's
+               // main thread executor
+               mainThreadExecutor.execute(() -> 
coordinator.notifyCheckpointAborted(checkpointId));
        }
 
        @Override
@@ -225,11 +234,6 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                coordinator.resetToCheckpoint(checkpointData);
        }
 
-       private void checkpointCompleteInternal(long checkpointId) {
-               mainThreadExecutor.assertRunningInMainThread();
-               coordinator.checkpointComplete(checkpointId);
-       }
-
        private void checkpointCoordinatorInternal(final long checkpointId, 
final CompletableFuture<byte[]> result) {
                mainThreadExecutor.assertRunningInMainThread();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index ea91156..c19997c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -73,8 +73,8 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
-               coordinator.checkpointComplete(checkpointId);
+       public void notifyCheckpointComplete(long checkpointId) {
+               coordinator.notifyCheckpointComplete(checkpointId);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3aa095d..a22a239 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -181,15 +181,31 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
                ensureStarted();
                coordinatorExecutor.execute(() -> {
                        try {
                                LOG.info("Marking checkpoint {} as completed 
for source {}.", checkpointId, operatorName);
                                context.onCheckpointComplete(checkpointId);
+                               
enumerator.notifyCheckpointComplete(checkpointId);
                        } catch (Exception e) {
-                               LOG.error("Failing the job due to exception 
when completing the checkpoint {} for source {}.",
-                                               checkpointId, operatorName, e);
+                               LOG.error("Failing the job due to exception 
when notifying the completion of the "
+                                       + "checkpoint {} for source {}.", 
checkpointId, operatorName, e);
+                               context.failJob(e);
+                       }
+               });
+       }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               ensureStarted();
+               coordinatorExecutor.execute(() -> {
+                       try {
+                               LOG.info("Marking checkpoint {} as aborted for 
source {}.", checkpointId, operatorName);
+                               
enumerator.notifyCheckpointAborted(checkpointId);
+                       } catch (Exception e) {
+                               LOG.error("Failing the job due to exception 
when notifying abortion of the "
+                                       + "checkpoint {} for source {}.", 
checkpointId, operatorName, e);
                                context.failJob(e);
                        }
                });
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 78e5336..1d89596 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -94,6 +95,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionJobVertex;
@@ -2562,6 +2564,57 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                
assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
        }
 
+       @Test
+       public void testNotifyCheckpointAbortionInOperatorCoordinator() throws 
Exception {
+               JobID jobId = new JobID();
+               final ExecutionAttemptID attemptID = new ExecutionAttemptID();
+               ExecutionVertex executionVertex = 
mockExecutionVertex(attemptID);
+
+               
CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext 
context =
+                               new 
CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
+                                       .setOperatorID(new OperatorID())
+                                       
.setOnCallingCheckpointCoordinator((ignored, future) -> future.complete(new 
byte[0]))
+                                       .build();
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinatorBuilder()
+                               .setJobId(jobId)
+                               .setTasks(new 
ExecutionVertex[]{executionVertex})
+                               .setCheckpointCoordinatorConfiguration(
+                                               
CheckpointCoordinatorConfiguration
+                                                         .builder()
+                                                         
.setMaxConcurrentCheckpoints(Integer.MAX_VALUE)
+                                                         .build())
+                               .setTimer(manuallyTriggeredScheduledExecutor)
+                               
.setCoordinatorsToCheckpoint(Collections.singleton(context))
+                               .build();
+               try {
+                       // Trigger checkpoint 1.
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       manuallyTriggeredScheduledExecutor.triggerAll();
+                       long checkpointId1 = 
Collections.max(checkpointCoordinator
+                                                                               
                                 .getPendingCheckpoints()
+                                                                               
                                 .keySet());
+                       // Trigger checkpoint 2.
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       manuallyTriggeredScheduledExecutor.triggerAll();
+
+                       // Acknowledge checkpoint 2. This should abort 
checkpoint 1.
+                       long checkpointId2 = 
Collections.max(checkpointCoordinator
+                                                                               
                                 .getPendingCheckpoints()
+                                                                               
                                 .keySet());
+                       AcknowledgeCheckpoint acknowledgeCheckpoint1 = new 
AcknowledgeCheckpoint(
+                                       jobId, attemptID, checkpointId2, new 
CheckpointMetrics(), null);
+                       
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
+
+                       // OperatorCoordinator should have been notified of the 
abortion of checkpoint 1.
+                       assertEquals(Collections.singletonList(1L), 
context.getAbortedCheckpoints());
+                       assertEquals(Collections.singletonList(2L), 
context.getCompletedCheckpoints());
+               } finally {
+                       checkpointCoordinator.shutdown(JobStatus.FINISHED);
+               }
+       }
+
        private CheckpointCoordinator getCheckpointCoordinator(
                JobID jobId,
                ExecutionVertex vertex1,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index df1395f..7e7e731 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -791,6 +791,8 @@ public class CheckpointCoordinatorTestingUtils {
                private final BiConsumer<Long, CompletableFuture<byte[]>> 
onCallingCheckpointCoordinator;
                private final Consumer<Long> 
onCallingAfterSourceBarrierInjection;
                private final OperatorID operatorID;
+               private final List<Long> completedCheckpoints;
+               private final List<Long> abortedCheckpoints;
 
                private MockOperatorCoordinatorCheckpointContext(
                                BiConsumer<Long, CompletableFuture<byte[]>> 
onCallingCheckpointCoordinator,
@@ -799,6 +801,8 @@ public class CheckpointCoordinatorTestingUtils {
                         this.onCallingCheckpointCoordinator = 
onCallingCheckpointCoordinator;
                         this.onCallingAfterSourceBarrierInjection = 
onCallingAfterSourceBarrierInjection;
                         this.operatorID = operatorID;
+                        this.completedCheckpoints = new ArrayList<>();
+                        this.abortedCheckpoints = new ArrayList<>();
                }
 
                @Override
@@ -821,8 +825,13 @@ public class CheckpointCoordinatorTestingUtils {
                }
 
                @Override
-               public void checkpointComplete(long checkpointId) {
+               public void notifyCheckpointComplete(long checkpointId) {
+                       completedCheckpoints.add(checkpointId);
+               }
 
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) {
+                       abortedCheckpoints.add(checkpointId);
                }
 
                @Override
@@ -844,6 +853,14 @@ public class CheckpointCoordinatorTestingUtils {
                public int currentParallelism() {
                        return 1;
                }
+
+               public List<Long> getCompletedCheckpoints() {
+                       return completedCheckpoints;
+               }
+
+               public List<Long> getAbortedCheckpoints() {
+                       return abortedCheckpoints;
+               }
        }
 
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index be6f07c..e3047d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -299,7 +299,7 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                }
 
                @Override
-               public void checkpointComplete(long checkpointId) {}
+               public void notifyCheckpointComplete(long checkpointId) {}
 
                @SuppressWarnings("CallToPrintStackTrace")
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index f4e8444..8c2c6ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -54,7 +54,7 @@ public final class MockOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
                throw new UnsupportedOperationException();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 29ec382..2c34de6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -532,7 +532,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                public abstract void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result) throws Exception;
 
                @Override
-               public void checkpointComplete(long checkpointId) {}
+               public void notifyCheckpointComplete(long checkpointId) {}
 
                @Override
                public void resetToCheckpoint(byte[] checkpointData) throws 
Exception {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index e91bce3..e4d8b04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -81,7 +81,7 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
                lastCheckpointComplete.offer(checkpointId);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 3e48a93..6f6b5ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -30,12 +30,14 @@ import 
org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
 import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
 import static org.junit.Assert.assertEquals;
@@ -52,7 +54,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
        public void testThrowExceptionWhenNotStarted() {
                // The following methods should only be invoked after the 
source coordinator has started.
                String failureMessage = "Call should fail when source 
coordinator has not started yet.";
-               verifyException(() -> 
sourceCoordinator.checkpointComplete(100L),
+               verifyException(() -> 
sourceCoordinator.notifyCheckpointComplete(100L),
                                failureMessage, "The coordinator has not 
started yet.");
                verifyException(() -> 
sourceCoordinator.handleEventFromOperator(0, null),
                                failureMessage, "The coordinator has not 
started yet.");
@@ -209,7 +211,13 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
                checkpointFuture.get();
 
                // Complete checkpoint 100.
-               sourceCoordinator.checkpointComplete(100L);
+               sourceCoordinator.notifyCheckpointComplete(100L);
+               waitUtil(
+                               () -> 
!enumerator.getSuccessfulCheckpoints().isEmpty(),
+                               Duration.ofMillis(1000L),
+                               "The enumerator failed to process the 
successful checkpoint "
+                                               + "before times out.");
+               assertEquals(100L, (long) 
enumerator.getSuccessfulCheckpoints().get(0));
 
                // Fail reader 0.
                sourceCoordinator.subtaskFailed(0, null);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 46d4239..b18d85e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -225,8 +225,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
        @Override
        public void snapshotState(StateSnapshotContext context) throws 
Exception {
-               LOG.debug("Taking a snapshot for checkpoint {}", 
context.getCheckpointId());
-               readerState.update(sourceReader.snapshotState());
+               long checkpointId = context.getCheckpointId();
+               LOG.debug("Taking a snapshot for checkpoint {}", checkpointId);
+               readerState.update(sourceReader.snapshotState(checkpointId));
        }
 
        @Override
@@ -241,6 +242,18 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                readerState = new SimpleVersionedListState<>(rawState, 
splitSerializer);
        }
 
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               super.notifyCheckpointComplete(checkpointId);
+               sourceReader.notifyCheckpointComplete(checkpointId);
+       }
+
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) throws Exception 
{
+               super.notifyCheckpointAborted(checkpointId);
+               sourceReader.notifyCheckpointAborted(checkpointId);
+       }
+
        @SuppressWarnings("unchecked")
        public void handleOperatorEvent(OperatorEvent event) {
                if (event instanceof AddSplitEvent) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index 695e4bd..1423415 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -199,7 +199,7 @@ public class CollectSinkOperatorCoordinator implements 
OperatorCoordinator, Coor
        }
 
        @Override
-       public void checkpointComplete(long checkpointId) {
+       public void notifyCheckpointComplete(long checkpointId) {
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 3b381fd..65228f0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -18,17 +18,21 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
@@ -36,10 +40,15 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.util.MockOutput;
+import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.After;
@@ -47,6 +56,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -69,15 +79,24 @@ public class SourceOperatorTest {
        private SourceOperator<Integer, MockSourceSplit> operator;
 
        @Before
-       public void setup() {
+       public void setup() throws Exception {
                this.mockSourceReader = new MockSourceReader();
                this.mockGateway = new MockOperatorEventGateway();
                this.operator = new TestingSourceOperator<>(mockSourceReader, 
mockGateway, SUBTASK_INDEX);
+               Environment env = getTestingEnvironment();
+               this.operator.setup(
+                               new SourceOperatorStreamTask<Integer>(env),
+                               new MockStreamConfig(new Configuration(), 1),
+                               new MockOutput<>(new ArrayList<>()));
+               this.operator.initializeState(new 
StreamTaskStateInitializerImpl(env, new MemoryStateBackend()));
        }
 
        @After
        public void cleanUp() throws Exception {
                operator.close();
+               if (((TestingSourceOperator<Integer>) 
operator).isReaderCreated()) {
+                       assertTrue(mockSourceReader.isClosed());
+               }
        }
 
        @Test
@@ -104,7 +123,6 @@ public class SourceOperatorTest {
                OperatorEvent operatorEvent = 
mockGateway.getEventsSent().get(0);
                assertTrue(operatorEvent instanceof ReaderRegistrationEvent);
                assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) 
operatorEvent).subtaskId());
-               assertTrue(mockSourceReader.isClosed());
        }
 
        @Test
@@ -155,6 +173,26 @@ public class SourceOperatorTest {
                assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
splitsInState);
        }
 
+       @Test
+       public void testNotifyCheckpointComplete() throws Exception {
+               StateInitializationContext stateContext = getStateContext();
+               operator.initializeState(stateContext);
+               operator.open();
+               operator.snapshotState(new 
StateSnapshotContextSynchronousImpl(100L, 100L));
+               operator.notifyCheckpointComplete(100L);
+               assertEquals(100L, (long) 
mockSourceReader.getCompletedCheckpoints().get(0));
+       }
+
+       @Test
+       public void testNotifyCheckpointAborted() throws Exception {
+               StateInitializationContext stateContext = getStateContext();
+               operator.initializeState(stateContext);
+               operator.open();
+               operator.snapshotState(new 
StateSnapshotContextSynchronousImpl(100L, 100L));
+               operator.notifyCheckpointAborted(100L);
+               assertEquals(100L, (long) 
mockSourceReader.getAbortedCheckpoints().get(0));
+       }
+
        // ---------------- helper methods -------------------------
 
        private StateInitializationContext getStateContext() throws Exception {
@@ -190,4 +228,15 @@ public class SourceOperatorTest {
                return abstractStateBackend.createOperatorStateBackend(
                        env, "test-operator", Collections.emptyList(), 
cancelStreamRegistry);
        }
+
+       private Environment getTestingEnvironment() {
+               return new StreamMockEnvironment(
+                               new Configuration(),
+                               new Configuration(),
+                               new ExecutionConfig(),
+                               1L,
+                               new MockInputSplitProvider(),
+                               1,
+                               new TestTaskStateManager());
+       }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index c189001..d48ed13 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -239,7 +239,7 @@ public class SourceOperatorEventTimeTest {
                }
 
                @Override
-               public List<MockSourceSplit> snapshotState() {
+               public List<MockSourceSplit> snapshotState(long checkpointId) {
                        throw new UnsupportedOperationException();
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index a133ace..14b94eb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -43,6 +43,8 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
        private final int subtaskIndex;
        private final int parallelism;
 
+       private volatile boolean readerCreated;
+
        public TestingSourceOperator(
                        SourceReader<T, MockSourceSplit> reader,
                        WatermarkStrategy<T> watermarkStrategy,
@@ -79,6 +81,13 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                this.subtaskIndex = subtaskIndex;
                this.parallelism = parallelism;
                this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+               this.readerCreated = false;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               readerCreated = true;
        }
 
        @Override
@@ -93,4 +102,8 @@ public class TestingSourceOperator<T>  extends 
SourceOperator<T, MockSourceSplit
                cfg.setAutoWatermarkInterval(100);
                return cfg;
        }
+
+       public boolean isReaderCreated() {
+               return readerCreated;
+       }
 }

Reply via email to