This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed40049  [FLINK-15101][connector/common] Add the SourceCoordinator 
implementation
ed40049 is described below

commit ed400497e56ea272722ac71697edf830b2d682ae
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Mar 23 08:53:31 2020 +0800

    [FLINK-15101][connector/common] Add the SourceCoordinator implementation
---
 .../flink/api/connector/source/ReaderInfo.java     |  15 ++
 .../api/connector/source/SplitEnumerator.java      |   3 +-
 .../connector/source/SplitEnumeratorContext.java   |  26 +-
 .../api/connector/source/mocks/MockSource.java     |  75 ++++++
 .../connector/source/mocks/MockSourceSplit.java    |   2 +-
 .../source/mocks/MockSplitEnumerator.java          | 139 +++++++++++
 .../MockSplitEnumeratorCheckpointSerializer.java   |  55 ++++
 .../source/coordinator/ExecutorNotifier.java       | 137 ++++++++++
 .../source/coordinator/SourceCoordinator.java      | 278 +++++++++++++++++++++
 .../coordinator/SourceCoordinatorContext.java      | 274 ++++++++++++++++++++
 .../coordinator/SourceCoordinatorProvider.java     | 112 +++++++++
 .../coordinator/SourceCoordinatorSerdeUtils.java   | 179 +++++++++++++
 .../source/coordinator/SplitAssignmentTracker.java | 164 ++++++++++++
 .../flink/runtime/source/event/AddSplitEvent.java  |  38 ++-
 .../source/event/ReaderRegistrationEvent.java      |  29 +--
 .../runtime/source/event/SourceEventWrapper.java   |  36 ++-
 .../MockOperatorCoordinatorContext.java            | 105 ++++++++
 .../source/coordinator/CoordinatorTestUtils.java   |  84 +++++++
 .../coordinator/SourceCoordinatorContextTest.java  | 188 ++++++++++++++
 .../source/coordinator/SourceCoordinatorTest.java  | 214 ++++++++++++++++
 .../coordinator/SourceCoordinatorTestBase.java     |  86 +++++++
 .../coordinator/SplitAssignmentTrackerTest.java    | 173 +++++++++++++
 22 files changed, 2341 insertions(+), 71 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index f899b12..4a048d1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.Public;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * A container class hosting the information of a {@link SourceReader}.
@@ -48,4 +49,18 @@ public final class ReaderInfo implements Serializable {
        public String getLocation() {
                return location;
        }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(subtaskId, location);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (!(obj instanceof ReaderInfo)) {
+                       return false;
+               }
+               ReaderInfo other = (ReaderInfo) obj;
+               return subtaskId == other.subtaskId && 
location.equals(other.location);
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index bdaee36..16b3938 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -66,8 +66,9 @@ public interface SplitEnumerator<SplitT extends SourceSplit, 
CheckpointT> extend
         * Checkpoints the state of this split enumerator.
         *
         * @return an object containing the state of the split enumerator.
+        * @throws Exception when the snapshot cannot be taken.
         */
-       CheckpointT snapshotState();
+       CheckpointT snapshotState() throws Exception;
 
        /**
         * Called to close the enumerator, in case it holds on to any 
resources, like threads or
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 0dd004a..db6ccad 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -48,11 +48,13 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
        void sendEventToSourceReader(int subtaskId, SourceEvent event);
 
        /**
-        * Get the number of subtasks.
+        * Get the current parallelism of this Source. Note that due to 
auto-scaling, the parallelism
+        * may change over time. Therefore the SplitEnumerator should not cache 
the return value
+        * of this method, but always invoke this method to get the latest 
parallelism.
         *
-        * @return the number of subtasks.
+        * @return the parallelism of the Source.
         */
-       int numSubtasks();
+       int currentParallelism();
 
        /**
         * Get the currently registered readers. The mapping is from subtask id 
to the reader info.
@@ -70,10 +72,12 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
 
        /**
         * Invoke the callable and handover the return value to the handler 
which will be executed
-        * by the source coordinator.
+        * by the source coordinator. When this method is invoked multiple 
times, The <code>Coallble</code>s
+        * may be executed in a thread pool concurrently.
         *
-        * <p>It is important to make sure that the callable should not modify
-        * any shared state. Otherwise the there might be unexpected behavior.
+        * <p>It is important to make sure that the callable does not modify 
any shared state, especially
+        * the states that will be a part of the {@link 
SplitEnumerator#snapshotState()}. Otherwise the
+        * there might be unexpected behavior.
         *
         * @param callable a callable to call.
         * @param handler a handler that handles the return value of or the 
exception thrown from the callable.
@@ -81,11 +85,13 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
        <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> 
handler);
 
        /**
-        * Invoke the callable and handover the return value to the handler 
which will be executed
-        * by the source coordinator.
+        * Invoke the given callable periodically and handover the return value 
to the handler which will
+        * be executed by the source coordinator. When this method is invoked 
multiple times, The
+        * <code>Coallble</code>s may be executed in a thread pool concurrently.
         *
-        * <p>It is important to make sure that the callable should not modify
-        * any shared state. Otherwise the there might be unexpected behavior.
+        * <p>It is important to make sure that the callable does not modify 
any shared state, especially
+        * the states that will be a part of the {@link 
SplitEnumerator#snapshotState()}. Otherwise the
+        * there might be unexpected behavior.
         *
         * @param callable the callable to call.
         * @param handler a handler that handles the return value of or the 
exception thrown from the callable.
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
new file mode 100644
index 0000000..f38ca60
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -0,0 +1,75 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * A mock {@link Source} for unit tests.
+ */
+public class MockSource implements Source<Integer, MockSourceSplit, 
Set<MockSourceSplit>> {
+       private final Boundedness boundedness;
+       private final int numSplits;
+
+       public MockSource(Boundedness boundedness, int numSplits) {
+               this.boundedness = boundedness;
+               this.numSplits = numSplits;
+       }
+
+       @Override
+       public Boundedness getBoundedness() {
+               return boundedness;
+       }
+
+       @Override
+       public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> 
createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
+               return new MockSplitEnumerator(numSplits, enumContext);
+       }
+
+       @Override
+       public SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>> 
restoreEnumerator(
+                       SplitEnumeratorContext<MockSourceSplit> enumContext,
+                       Set<MockSourceSplit> checkpoint) throws IOException {
+               return new MockSplitEnumerator(checkpoint, enumContext);
+       }
+
+       @Override
+       public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
+               return new MockSourceSplitSerializer();
+       }
+
+       @Override
+       public SimpleVersionedSerializer<Set<MockSourceSplit>> 
getEnumeratorCheckpointSerializer() {
+               return new MockSplitEnumeratorCheckpointSerializer();
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
index 08a7ed8..9dbc9de 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceSplit.java
@@ -90,7 +90,7 @@ public class MockSourceSplit implements SourceSplit, 
Serializable {
 
        @Override
        public int hashCode() {
-               return Objects.hash(id, records.toArray(new Integer[0]), 
endIndex, index);
+               return Objects.hash(id, Arrays.hashCode(records.toArray(new 
Integer[0])), endIndex, index);
        }
 
        @Override
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
new file mode 100644
index 0000000..64ec8af
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -0,0 +1,139 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A mock {@link SplitEnumerator} for unit tests.
+ */
+public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, 
Set<MockSourceSplit>> {
+       private SortedSet<MockSourceSplit> unassignedSplits;
+       private SplitEnumeratorContext<MockSourceSplit> enumContext;
+       private List<SourceEvent> handledSourceEvent;
+       private boolean started;
+       private boolean closed;
+
+       public MockSplitEnumerator(int numSplits, 
SplitEnumeratorContext<MockSourceSplit> enumContext) {
+               this(new HashSet<>(), enumContext);
+               for (int i = 0; i < numSplits; i++) {
+                       unassignedSplits.add(new MockSourceSplit(i));
+               }
+       }
+
+       public MockSplitEnumerator(
+                       Set<MockSourceSplit> unassignedSplits,
+                       SplitEnumeratorContext<MockSourceSplit> enumContext) {
+               this.unassignedSplits = new TreeSet<>(Comparator.comparingInt(o 
-> Integer.parseInt(o.splitId())));
+               this.unassignedSplits.addAll(unassignedSplits);
+               this.enumContext = enumContext;
+               this.handledSourceEvent = new ArrayList<>();
+               this.started = false;
+               this.closed = false;
+       }
+
+       @Override
+       public void start() {
+               this.started = true;
+       }
+
+       @Override
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+               handledSourceEvent.add(sourceEvent);
+       }
+
+       @Override
+       public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
+               unassignedSplits.addAll(splits);
+       }
+
+       @Override
+       public void addReader(int subtaskId) {
+               List<MockSourceSplit> assignment = new ArrayList<>();
+               for (MockSourceSplit split : unassignedSplits) {
+                       if (Integer.parseInt(split.splitId()) % 
enumContext.currentParallelism() == subtaskId) {
+                               assignment.add(split);
+                       }
+               }
+               enumContext.assignSplits(new 
SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment)));
+               unassignedSplits.removeAll(assignment);
+       }
+
+       @Override
+       public Set<MockSourceSplit> snapshotState() {
+               return unassignedSplits;
+       }
+
+       @Override
+       public void close() throws IOException {
+               this.closed = true;
+       }
+
+       public void addNewSplits(List<MockSourceSplit> newSplits) {
+               unassignedSplits.addAll(newSplits);
+               assignAllSplits();
+       }
+
+       // --------------------
+
+       public boolean started() {
+               return started;
+       }
+
+       public boolean closed() {
+               return closed;
+       }
+
+       public Set<MockSourceSplit> getUnassignedSplits() {
+               return unassignedSplits;
+       }
+
+       public List<SourceEvent> getHandledSourceEvent() {
+               return handledSourceEvent;
+       }
+
+       // --------------------
+
+       private void assignAllSplits() {
+               Map<Integer, List<MockSourceSplit>> assignment = new 
HashMap<>();
+               unassignedSplits.forEach(split -> {
+                       int subtaskId = Integer.parseInt(split.splitId()) % 
enumContext.currentParallelism();
+                       if 
(enumContext.registeredReaders().containsKey(subtaskId)) {
+                               assignment.computeIfAbsent(subtaskId, ignored 
-> new ArrayList<>()).add(split);
+                       }
+               });
+               enumContext.assignSplits(new SplitsAssignment<>(assignment));
+               assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
new file mode 100644
index 0000000..4879b18
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorCheckpointSerializer.java
@@ -0,0 +1,55 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.mocks;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Serializer for the checkpoint of {@link MockSplitEnumerator}.
+ */
+public class MockSplitEnumeratorCheckpointSerializer implements 
SimpleVersionedSerializer<Set<MockSourceSplit>> {
+
+       @Override
+       public int getVersion() {
+               return 0;
+       }
+
+       @Override
+       public byte[] serialize(Set<MockSourceSplit> obj) throws IOException {
+               return InstantiationUtil.serializeObject(new ArrayList<>(obj));
+       }
+
+       @Override
+       public Set<MockSourceSplit> deserialize(int version, byte[] serialized) 
throws IOException {
+               try {
+                       ArrayList<MockSourceSplit> list = 
InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+                       return new HashSet<>(list);
+               } catch (ClassNotFoundException e) {
+                       throw new FlinkRuntimeException("Failed to deserialize 
the enumerator checkpoint.");
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
new file mode 100644
index 0000000..e730a15
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -0,0 +1,137 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+/**
+ * This class is used to coordinate between two components, where one 
component has an
+ * executor following the mailbox model and the other component notifies it 
when needed.
+ */
+public class ExecutorNotifier {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorNotifier.class);
+       private ScheduledExecutorService workerExecutor;
+       private Executor executorToNotify;
+
+       public ExecutorNotifier(ScheduledExecutorService workerExecutor,
+                                                       Executor 
executorToNotify) {
+               this.executorToNotify = executorToNotify;
+               this.workerExecutor = workerExecutor;
+       }
+
+       /**
+        * Call the given callable once. Notify the {@link #executorToNotify} 
to execute
+        * the handler.
+        *
+        * <p>Note that when this method is invoked multiple times, it is 
possible that
+        * multiple callables are executed concurrently, so do the handlers. 
For example,
+        * assuming both the workerExecutor and executorToNotify are single 
threaded.
+        * The following code may still throw a 
<code>ConcurrentModificationException</code>.
+        *
+        * <pre>{@code
+        *  final List<Integer> list = new ArrayList<>();
+        *
+        *  // The callable adds an integer 1 to the list, while it works at 
the first glance,
+        *  // A ConcurrentModificationException may be thrown because the 
caller and
+        *  // handler may modify the list at the same time.
+        *  notifier.notifyReadyAsync(
+        *      () -> list.add(1),
+        *      (ignoredValue, ignoredThrowable) -> list.add(2));
+        * }</pre>
+        *
+        * <p>Instead, the above logic should be implemented in as:
+        * <pre>{@code
+        *  // Modify the state in the handler.
+        *  notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+        *      list.add(v));
+        *      list.add(2);
+        *  });
+        * }</pre>
+        *
+        * @param callable the callable to invoke before notifying the executor.
+        * @param handler the handler to handle the result of the callable.
+        */
+       public <T> void notifyReadyAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
+               workerExecutor.execute(() -> {
+                       try {
+                               T result = callable.call();
+                               executorToNotify.execute(() -> 
handler.accept(result, null));
+                       } catch (Throwable t) {
+                               LOG.error("Unexpected exception {}", t);
+                               handler.accept(null, t);
+                       }
+               });
+       }
+
+       /**
+        * Call the given callable once. Notify the {@link #executorToNotify} 
to execute
+        * the handler.
+        *
+        * <p>Note that when this method is invoked multiple times, it is 
possible that
+        * multiple callables are executed concurrently, so do the handlers. 
For example,
+        * assuming both the workerExecutor and executorToNotify are single 
threaded.
+        * The following code may still throw a 
<code>ConcurrentModificationException</code>.
+        *
+        * <pre>{@code
+        *  final List<Integer> list = new ArrayList<>();
+        *
+        *  // The callable adds an integer 1 to the list, while it works at 
the first glance,
+        *  // A ConcurrentModificationException may be thrown because the 
caller and
+        *  // handler may modify the list at the same time.
+        *  notifier.notifyReadyAsync(
+        *      () -> list.add(1),
+        *      (ignoredValue, ignoredThrowable) -> list.add(2));
+        * }</pre>
+        *
+        * <p>Instead, the above logic should be implemented in as:
+        * <pre>{@code
+        *  // Modify the state in the handler.
+        *  notifier.notifyReadyAsync(() -> 1, (v, ignoredThrowable) -> {
+        *      list.add(v));
+        *      list.add(2);
+        *  });
+        * }</pre>
+        *
+        * @param callable the callable to execute before notifying the 
executor to notify.
+        * @param handler the handler that handles the result from the callable.
+        * @param initialDelayMs the initial delay in ms before invoking the 
given callable.
+        * @param periodMs the interval in ms to invoke the callable.
+        */
+       public <T> void notifyReadyAsync(
+                       Callable<T> callable,
+                       BiConsumer<T, Throwable> handler,
+                       long initialDelayMs,
+                       long periodMs) {
+               workerExecutor.scheduleAtFixedRate(() -> {
+                       try {
+                               T result = callable.call();
+                               executorToNotify.execute(() -> 
handler.accept(result, null));
+                       } catch (Throwable t) {
+                               handler.accept(null, t);
+                       }
+               }, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
new file mode 100644
index 0000000..4fc2aeb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -0,0 +1,278 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readBytes;
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> 
implements OperatorCoordinator {
+       private static final Logger LOG = 
LoggerFactory.getLogger(SourceCoordinator.class);
+       /** The name of the operator this SourceCoordinator is associated with. 
*/
+       private final String operatorName;
+       /** A single-thread executor to handle all the changes to the 
coordinator. */
+       private final ExecutorService coordinatorExecutor;
+       /** The Source that is associated with this SourceCoordinator. */
+       private final Source<?, SplitT, EnumChkT> source;
+       /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+       private final SimpleVersionedSerializer<EnumChkT> 
enumCheckpointSerializer;
+       /** The serializer for the SourceSplit of the associated Source. */
+       private final SimpleVersionedSerializer<SplitT> splitSerializer;
+       /** The context containing the states of the coordinator. */
+       private final SourceCoordinatorContext<SplitT> context;
+       /** The split enumerator created from the associated Source. */
+       private SplitEnumerator<SplitT, EnumChkT> enumerator;
+       /** A flag marking whether the coordinator has started. */
+       private boolean started;
+
+       public SourceCoordinator(
+                       String operatorName,
+                       ExecutorService coordinatorExecutor,
+                       Source<?, SplitT, EnumChkT> source,
+                       SourceCoordinatorContext<SplitT> context) {
+               this.operatorName = operatorName;
+               this.coordinatorExecutor = coordinatorExecutor;
+               this.source = source;
+               this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+               this.splitSerializer = source.getSplitSerializer();
+               this.context = context;
+               this.enumerator = source.createEnumerator(context);
+               this.started = false;
+       }
+
+       @Override
+       public void start() throws Exception {
+               LOG.info("Starting split enumerator for source {}.", 
operatorName);
+               enumerator.start();
+               started = true;
+       }
+
+       @Override
+       public void close() throws Exception {
+               LOG.info("Closing SourceCoordinator for source {}.", 
operatorName);
+               boolean successfullyClosed = false;
+               try {
+                       if (started) {
+                               enumerator.close();
+                       }
+               } finally {
+                       coordinatorExecutor.shutdownNow();
+                       // We do not expect this to actually block for long. At 
this point, there should be very few task running
+                       // in the executor, if any.
+                       successfullyClosed = 
coordinatorExecutor.awaitTermination(10, TimeUnit.SECONDS);
+               }
+               if (!successfullyClosed) {
+                       throw new TimeoutException("The source coordinator 
failed to close before timeout.");
+               }
+               LOG.info("Source coordinator for source {} closed.", 
operatorName);
+       }
+
+       @Override
+       public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
+               ensureStarted();
+               coordinatorExecutor.execute(() -> {
+                       try {
+                               LOG.debug("Handling event from subtask {} of 
source {}: {}", subtask, operatorName, event);
+                               if (event instanceof SourceEventWrapper) {
+                                       enumerator.handleSourceEvent(subtask, 
((SourceEventWrapper) event).getSourceEvent());
+                               } else if (event instanceof 
ReaderRegistrationEvent) {
+                                       
handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+                               }
+                       } catch (Exception e) {
+                               LOG.error("Failing the job due to exception 
when handling operator event {} from subtask {} " +
+                                                               "of source 
{}.", event, subtask, operatorName, e);
+                               context.failJob(e);
+                       }
+               });
+       }
+
+       @Override
+       public void subtaskFailed(int subtaskId) {
+               ensureStarted();
+               coordinatorExecutor.execute(() -> {
+                       try {
+                               LOG.info("Handling subtask {} failure of source 
{}.", subtaskId, operatorName);
+                               List<SplitT> splitsToAddBack = 
context.getAndRemoveUncheckpointedAssignment(subtaskId);
+                               context.unregisterSourceReader(subtaskId);
+                               LOG.debug("Adding {} back to the split 
enumerator of source {}.", splitsToAddBack, operatorName);
+                               enumerator.addSplitsBack(splitsToAddBack, 
subtaskId);
+                       } catch (Exception e) {
+                               LOG.error("Failing the job due to exception 
when handling subtask {} failure in source {}.",
+                                               subtaskId, operatorName, e);
+                               context.failJob(e);
+                       }
+               });
+       }
+
+       @Override
+       public CompletableFuture<byte[]> checkpointCoordinator(long 
checkpointId) throws Exception {
+               ensureStarted();
+               return CompletableFuture.supplyAsync(() -> {
+                       try {
+                               LOG.debug("Taking a state snapshot on operator 
{} for checkpoint {}", operatorName, checkpointId);
+                               return toBytes(checkpointId);
+                       } catch (Exception e) {
+                               throw new CompletionException(
+                                               String.format("Failed to 
checkpoint coordinator for source %s due to ", operatorName), e);
+                       }
+               }, coordinatorExecutor);
+       }
+
+       @Override
+       public void checkpointComplete(long checkpointId) {
+               ensureStarted();
+               coordinatorExecutor.execute(() -> {
+                       try {
+                               LOG.info("Marking checkpoint {} as completed 
for source {}.", checkpointId, operatorName);
+                               context.onCheckpointComplete(checkpointId);
+                       } catch (Exception e) {
+                               LOG.error("Failing the job due to exception 
when completing the checkpoint {} for source {}.",
+                                               checkpointId, operatorName, e);
+                               context.failJob(e);
+                       }
+               });
+       }
+
+       @Override
+       public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+               if (started) {
+                       throw new IllegalStateException(String.format(
+                                       "The coordinator for source %s has 
started. The source coordinator state can " +
+                                       "only be reset to a checkpoint before 
it starts.", operatorName));
+               }
+               LOG.info("Resetting coordinator of source {} from checkpoint.", 
operatorName);
+               if (started) {
+                       enumerator.close();
+               }
+               LOG.info("Resetting SourceCoordinator from checkpoint.");
+               fromBytes(checkpointData);
+       }
+
+       // ---------------------------------------------------
+       @VisibleForTesting
+       SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
+               return enumerator;
+       }
+
+       @VisibleForTesting
+       SourceCoordinatorContext<SplitT> getContext() {
+               return context;
+       }
+
+       // --------------------- Serde -----------------------
+
+       /**
+        * Serialize the coordinator state. The current implementation may not 
be super efficient,
+        * but it should not matter that much because most of the state should 
be rather small.
+        * Large states themselves may already be a problem regardless of how 
the serialization
+        * is implemented.
+        *
+        * @return A byte array containing the serialized state of the source 
coordinator.
+        * @throws Exception When something goes wrong in serialization.
+        */
+       private byte[] toBytes(long checkpointId) throws Exception {
+               EnumChkT enumCkpt = enumerator.snapshotState();
+
+               try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                               DataOutputStream out = new 
DataOutputViewStreamWrapper(baos)) {
+                       writeCoordinatorSerdeVersion(out);
+                       out.writeInt(enumCheckpointSerializer.getVersion());
+                       byte[] serialziedEnumChkpt = 
enumCheckpointSerializer.serialize(enumCkpt);
+                       out.writeInt(serialziedEnumChkpt.length);
+                       out.write(serialziedEnumChkpt);
+                       context.snapshotState(checkpointId, splitSerializer, 
out);
+                       out.flush();
+                       return baos.toByteArray();
+               }
+       }
+
+       /**
+        * Restore the state of this source coordinator from the state bytes.
+        *
+        * @param bytes The checkpoint bytes that was returned from {@link 
#toBytes(long)}
+        * @throws Exception When the deserialization failed.
+        */
+       private void fromBytes(byte[] bytes) throws Exception {
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
+                               DataInputStream in = new 
DataInputViewStreamWrapper(bais)) {
+                       readAndVerifyCoordinatorSerdeVersion(in);
+                       int enumSerializerVersion = in.readInt();
+                       int serializedEnumChkptSize = in.readInt();
+                       byte[] serializedEnumChkpt = readBytes(in, 
serializedEnumChkptSize);
+                       EnumChkT enumChkpt = 
enumCheckpointSerializer.deserialize(enumSerializerVersion, 
serializedEnumChkpt);
+                       context.restoreState(splitSerializer, in);
+                       enumerator = source.restoreEnumerator(context, 
enumChkpt);
+               }
+       }
+
+       // --------------------- private methods -------------
+
+       private void handleReaderRegistrationEvent(ReaderRegistrationEvent 
event) {
+               context.registerSourceReader(new ReaderInfo(event.subtaskId(), 
event.location()));
+               enumerator.addReader(event.subtaskId());
+       }
+
+       private void ensureStarted() {
+               if (!started) {
+                       throw new IllegalStateException("The coordinator has 
not started yet.");
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
new file mode 100644
index 0000000..47fe564
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -0,0 +1,274 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readRegisteredReaders;
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeRegisteredReaders;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link 
SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the 
SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * <p>The context serves a few purposes:
+ * <ul>
+ *     <li>
+ *         Information provider - The context provides necessary information 
to the enumerator for it to
+ *         know what is the status of the source readers and their split 
assignments. These information
+ *         allows the split enumerator to do the coordination.
+ *     </li>
+ *     <li>
+ *         Action taker - The context also provides a few actions that the 
enumerator can take to carry
+ *         out the coordination. So far there are two actions: 1) assign 
splits to the source readers.
+ *         and 2) sens a custom {@link SourceEvent SourceEvents} to the source 
readers.
+ *     </li>
+ *     <li>
+ *         Thread model enforcement - The context ensures that all the 
manipulations to the coordinator state
+ *         are handled by the same thread.
+ *     </li>
+ * </ul>
+ * @param <SplitT> the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext<SplitT extends SourceSplit> implements 
SplitEnumeratorContext<SplitT> {
+       private final ExecutorService coordinatorExecutor;
+       private final ExecutorNotifier notifier;
+       private final OperatorCoordinator.Context operatorCoordinatorContext;
+       private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+       private final SplitAssignmentTracker<SplitT> assignmentTracker;
+       private final 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory;
+       private final String coordinatorThreadName;
+
+       public SourceCoordinatorContext(
+                       ExecutorService coordinatorExecutor,
+                       
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+                       int numWorkerThreads,
+                       OperatorCoordinator.Context operatorCoordinatorContext) 
{
+               this(coordinatorExecutor, coordinatorThreadFactory, 
numWorkerThreads, operatorCoordinatorContext,
+                               new SplitAssignmentTracker<>());
+       }
+
+       // Package private method for unit test.
+       SourceCoordinatorContext(
+                       ExecutorService coordinatorExecutor,
+                       
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory,
+                       int numWorkerThreads,
+                       OperatorCoordinator.Context operatorCoordinatorContext,
+                       SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+               this.coordinatorExecutor = coordinatorExecutor;
+               this.coordinatorThreadFactory = coordinatorThreadFactory;
+               this.operatorCoordinatorContext = operatorCoordinatorContext;
+               this.registeredReaders = new ConcurrentHashMap<>();
+               this.assignmentTracker = splitAssignmentTracker;
+               this.coordinatorThreadName = 
coordinatorThreadFactory.getCoordinatorThreadName();
+               this.notifier = new ExecutorNotifier(
+                               
Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
+                                       private int index = 0;
+                                       @Override
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r, 
coordinatorThreadName + "-worker-" + index++);
+                                       }
+                               }),
+                               coordinatorExecutor);
+       }
+
+       @Override
+       public MetricGroup metricGroup() {
+               return null;
+       }
+
+       @Override
+       public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+               try {
+                       operatorCoordinatorContext.sendEvent(new 
SourceEventWrapper(event), subtaskId);
+               } catch (TaskNotRunningException e) {
+                       throw new FlinkRuntimeException(String.format("Failed 
to send event %s to subtask %d",
+                                       event,
+                                       subtaskId), e);
+               }
+       }
+
+       @Override
+       public int currentParallelism() {
+               return operatorCoordinatorContext.currentParallelism();
+       }
+
+       @Override
+       public Map<Integer, ReaderInfo> registeredReaders() {
+               return Collections.unmodifiableMap(registeredReaders);
+       }
+
+       @Override
+       public void assignSplits(SplitsAssignment<SplitT> assignment) {
+               // Ensure the split assignment is done by the the coordinator 
executor.
+               if 
(!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+                       try {
+                               coordinatorExecutor.submit(() -> 
assignSplits(assignment)).get();
+                               return;
+                       } catch (InterruptedException | ExecutionException e) {
+                               throw new FlinkRuntimeException("Failed to 
assign splits due to", e);
+                       }
+               }
+
+               // Ensure all the subtasks in the assignment have registered.
+               for (Integer subtaskId : assignment.assignment().keySet()) {
+                       if (!registeredReaders.containsKey(subtaskId)) {
+                               throw new 
IllegalArgumentException(String.format(
+                                               "Cannot assign splits %s to 
subtask %d because the subtask is not registered.",
+                                               
registeredReaders.get(subtaskId), subtaskId));
+                       }
+               }
+
+               assignmentTracker.recordSplitAssignment(assignment);
+               assignment.assignment().forEach(
+                               (id, splits) -> {
+                                       try {
+                                               
operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+                                       } catch (TaskNotRunningException e) {
+                                               throw new 
FlinkRuntimeException(String.format(
+                                                               "Failed to 
assign splits %s to reader %d.", splits, id), e);
+                                       }
+                               });
+       }
+
+       @Override
+       public <T> void callAsync(
+                       Callable<T> callable,
+                       BiConsumer<T, Throwable> handler,
+                       long initialDelay,
+                       long period) {
+               notifier.notifyReadyAsync(callable, handler, initialDelay, 
period);
+       }
+
+       @Override
+       public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
+               notifier.notifyReadyAsync(callable, handler);
+       }
+
+       // --------- Package private additional methods for the 
SourceCoordinator ------------
+
+       /**
+        * Fail the job with the given cause.
+        *
+        * @param cause the cause of the job failure.
+        */
+       void failJob(Throwable cause) {
+               operatorCoordinatorContext.failJob(cause);
+       }
+
+       /**
+        * Take a snapshot of this SourceCoordinatorContext.
+        *
+        * @param checkpointId The id of the ongoing checkpoint.
+        * @param splitSerializer The serializer of the splits.
+        * @param out An ObjectOutput that can be used to
+        */
+       void snapshotState(
+                       long checkpointId,
+                       SimpleVersionedSerializer<SplitT> splitSerializer,
+                       DataOutputStream out) throws Exception {
+               writeRegisteredReaders(registeredReaders, out);
+               assignmentTracker.snapshotState(checkpointId, splitSerializer, 
out);
+       }
+
+       /**
+        * Restore the state of the context.
+        * @param splitSerializer the serializer for the SourceSplits.
+        * @param in the input from which the states are read.
+        * @throws Exception when the restoration failed.
+        */
+       @SuppressWarnings("unchecked")
+       void restoreState(
+                       SimpleVersionedSerializer<SplitT> splitSerializer,
+                       DataInputStream in) throws Exception {
+               Map<Integer, ReaderInfo> readers = readRegisteredReaders(in);
+               registeredReaders.clear();
+               registeredReaders.putAll(readers);
+               assignmentTracker.restoreState(splitSerializer, in);
+       }
+
+       /**
+        * Register a source reader.
+        *
+        * @param readerInfo the reader information of the source reader.
+        */
+       void registerSourceReader(ReaderInfo readerInfo) {
+               registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+       }
+
+       /**
+        * Unregister a source reader.
+        *
+        * @param subtaskId the subtask id of the source reader.
+        */
+       void unregisterSourceReader(int subtaskId) {
+               Preconditions.checkNotNull(registeredReaders.remove(subtaskId), 
String.format(
+                               "Failed to unregister source reader of id %s 
because it is not registered.", subtaskId));
+       }
+
+       /**
+        * Get the split to put back. This only happens when a source reader 
subtask has failed.
+        *
+        * @param failedSubtaskId the failed subtask id.
+        * @return A list of splits that needs to be added back to the {@link 
SplitEnumerator}.
+        */
+       List<SplitT> getAndRemoveUncheckpointedAssignment(int failedSubtaskId) {
+               return 
assignmentTracker.getAndRemoveUncheckpointedAssignment(failedSubtaskId);
+       }
+
+       /**
+        * Invoked when a successful checkpoint has been taken.
+        *
+        * @param checkpointId the id of the successful checkpoint.
+        */
+       void onCheckpointComplete(long checkpointId) {
+               assignmentTracker.onCheckpointComplete(checkpointId);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
new file mode 100644
index 0000000..77ad7a3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -0,0 +1,112 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider<SplitT extends SourceSplit>
+               implements OperatorCoordinator.Provider {
+       private final String operatorName;
+       private final OperatorID operatorID;
+       private final Source<?, SplitT, ?> source;
+       private final int numWorkerThreads;
+
+       /**
+        * Construct the {@link SourceCoordinatorProvider}.
+        *
+        * @param operatorName the name of the operator.
+        * @param operatorID the ID of the operator this coordinator 
corresponds to.
+        * @param source the Source that will be used for this coordinator.
+        * @param numWorkerThreads the number of threads the should provide to 
the SplitEnumerator
+        *                         for doing async calls. See
+        *                         {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable,
 BiConsumer)
+        *                         SplitEnumeratorContext.callAsync()}.
+        */
+       public SourceCoordinatorProvider(
+                       String operatorName,
+                       OperatorID operatorID,
+                       Source<?, SplitT, ?> source,
+                       int numWorkerThreads) {
+               this.operatorName = operatorName;
+               this.operatorID = operatorID;
+               this.source = source;
+               this.numWorkerThreads = numWorkerThreads;
+       }
+
+       @Override
+       public OperatorID getOperatorId() {
+               return operatorID;
+       }
+
+       @Override
+       public OperatorCoordinator create(OperatorCoordinator.Context context) {
+               final String coordinatorThreadName = "SourceCoordinator-" + 
operatorName;
+               CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+                               new 
CoordinatorExecutorThreadFactory(coordinatorThreadName);
+               ExecutorService coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+               SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
+                               new 
SourceCoordinatorContext<>(coordinatorExecutor, coordinatorThreadFactory, 
numWorkerThreads, context);
+               return new SourceCoordinator<>(operatorName, 
coordinatorExecutor, source, sourceCoordinatorContext);
+       }
+
+       /**
+        * A thread factory class that provides some helper methods.
+        */
+       public static class CoordinatorExecutorThreadFactory implements 
ThreadFactory {
+               private final String coordinatorThreadName;
+               private Thread t;
+
+               CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+                       this.coordinatorThreadName = coordinatorThreadName;
+                       this.t = null;
+               }
+
+               @Override
+               public Thread newThread(Runnable r) {
+                       if (t != null) {
+                               throw new IllegalStateException("Should never 
happen. This factory should only be used by a " +
+                                               "SingleThreadExecutor.");
+                       }
+                       t = new Thread(r, coordinatorThreadName);
+                       
t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+                       return t;
+               }
+
+               String getCoordinatorThreadName() {
+                       return coordinatorThreadName;
+               }
+
+               boolean isCurrentThreadCoordinatorThread() {
+                       return Thread.currentThread() == t;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
new file mode 100644
index 0000000..2855b2a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorSerdeUtils.java
@@ -0,0 +1,179 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+
+/**
+ * A serialization util class for the {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorSerdeUtils {
+       /** The current source coordinator serde version. */
+       private static final int CURRENT_VERSION = 0;
+
+       /** Private constructor for utility class. */
+       private SourceCoordinatorSerdeUtils() {}
+
+       /** Write the current serde version. */
+       static void writeCoordinatorSerdeVersion(DataOutputStream out) throws 
IOException {
+               out.writeInt(CURRENT_VERSION);
+       }
+
+       /** Read and verify the serde version. */
+       static void readAndVerifyCoordinatorSerdeVersion(DataInputStream in) 
throws IOException {
+               int version = in.readInt();
+               if (version > CURRENT_VERSION) {
+                       throw new IOException("Unsupported source coordinator 
serde version " + version);
+               }
+       }
+
+       /**
+        * Get serialized size of the registered readers map.
+        *
+        * <p>The binary format is following:
+        * 4 Bytes - num entries.
+        * N Bytes - entries
+        *              4 Bytes - subtask id
+        *              N Bytes - reader info, see {@link 
#writeReaderInfo(ReaderInfo, DataOutputStream)}.
+        */
+       static void writeRegisteredReaders(Map<Integer, ReaderInfo> 
registeredReaders, DataOutputStream out) throws IOException {
+               out.writeInt(registeredReaders.size());
+               for (ReaderInfo info : registeredReaders.values()) {
+                       writeReaderInfo(info, out);
+               }
+       }
+
+       static Map<Integer, ReaderInfo> readRegisteredReaders(DataInputStream 
in) throws IOException {
+               int numReaders = in.readInt();
+               Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
+               for (int i = 0; i < numReaders; i++) {
+                       ReaderInfo info = readReaderInfo(in);
+                       registeredReaders.put(info.getSubtaskId(), info);
+               }
+               return registeredReaders;
+       }
+
+       /**
+        * Serialize the assignment by checkpoint ids.
+        */
+       static <SplitT> void writeAssignmentsByCheckpointId(
+                       Map<Long, Map<Integer, LinkedHashSet<SplitT>>> 
assignmentByCheckpointIds,
+                       SimpleVersionedSerializer<SplitT> splitSerializer,
+                       DataOutputStream out) throws IOException {
+               // SplitSerializer version.
+               out.writeInt(splitSerializer.getVersion());
+               // Num checkpoints.
+               out.writeInt(assignmentByCheckpointIds.size());
+               for (Map.Entry<Long, Map<Integer, LinkedHashSet<SplitT>>> 
assignments : assignmentByCheckpointIds.entrySet()) {
+                       long checkpointId = assignments.getKey();
+                       out.writeLong(checkpointId);
+
+                       int numSubtasks = assignments.getValue().size();
+                       out.writeInt(numSubtasks);
+                       for (Map.Entry<Integer, LinkedHashSet<SplitT>> 
assignment : assignments.getValue().entrySet()) {
+                               int subtaskId = assignment.getKey();
+                               out.writeInt(subtaskId);
+
+                               int numAssignedSplits = 
assignment.getValue().size();
+                               out.writeInt(numAssignedSplits);
+                               for (SplitT split : assignment.getValue()) {
+                                       byte[] serializedSplit = 
splitSerializer.serialize(split);
+                                       out.writeInt(serializedSplit.length);
+                                       out.write(serializedSplit);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Deserialize the assignment by checkpoint ids.
+        */
+       static <SplitT> Map<Long, Map<Integer, LinkedHashSet<SplitT>>> 
readAssignmentsByCheckpointId(
+                       DataInputStream in,
+                       SimpleVersionedSerializer<SplitT> splitSerializer) 
throws IOException {
+               int splitSerializerVersion = in.readInt();
+               int numCheckpoints = in.readInt();
+               Map<Long, Map<Integer, LinkedHashSet<SplitT>>> 
assignmentsByCheckpointIds = new HashMap<>(numCheckpoints);
+               for (int i = 0; i < numCheckpoints; i++) {
+                       long checkpointId = in.readLong();
+                       int numSubtasks = in.readInt();
+                       Map<Integer, LinkedHashSet<SplitT>> assignments = new 
HashMap<>();
+                       assignmentsByCheckpointIds.put(checkpointId, 
assignments);
+                       for (int j = 0; j < numSubtasks; j++) {
+                               int subtaskId = in.readInt();
+                               int numAssignedSplits = in.readInt();
+                               LinkedHashSet<SplitT> splits = new 
LinkedHashSet<>(numAssignedSplits);
+                               assignments.put(subtaskId, splits);
+                               for (int k = 0; k < numAssignedSplits; k++) {
+                                       int serializedSplitSize = in.readInt();
+                                       byte[] serializedSplit = readBytes(in, 
serializedSplitSize);
+                                       SplitT split = 
splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
+                                       splits.add(split);
+                               }
+                       }
+               }
+               return assignmentsByCheckpointIds;
+       }
+
+       static byte[] readBytes(DataInputStream in, int size) throws 
IOException {
+               byte[] bytes = new byte[size];
+               int off = 0;
+               // For ByteArrayInputStream the read should succeed with one 
shot.
+               while (off < size) {
+                       int read = in.read(bytes, off, size - off);
+                       if (read < 0) {
+                               throw new BufferUnderflowException();
+                       }
+                       off += read;
+               }
+               return bytes;
+       }
+
+       // ----- private helper methods -----
+
+       /**
+        * Serialize {@link ReaderInfo}.
+        *
+        * <p>The binary format is following:
+        * 4 Bytes - subtask id
+        * N Bytes - location string
+        *
+        * @param readerInfo the given reader information to serialize.
+        */
+       private static void writeReaderInfo(ReaderInfo readerInfo, 
DataOutputStream out) throws IOException {
+               out.writeInt(readerInfo.getSubtaskId());
+               out.writeUTF(readerInfo.getLocation());
+       }
+
+       private static ReaderInfo readReaderInfo(DataInputStream in) throws 
IOException {
+               int subtaskId = in.readInt();
+               String location = in.readUTF();
+               return new ReaderInfo(subtaskId, location);
+       }
+
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
new file mode 100644
index 0000000..7985f7d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTracker.java
@@ -0,0 +1,164 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAssignmentsByCheckpointId;
+import static 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.writeAssignmentsByCheckpointId;
+
+/**
+ * A class that is responsible for tracking the past split assignments made by
+ * {@link SplitEnumerator}.
+ */
+@Internal
+public class SplitAssignmentTracker<SplitT extends SourceSplit> {
+       // All the split assignments since the last successful checkpoint.
+       // Maintaining this allow the subtasks to fail over independently.
+       // The mapping is [CheckpointId -> [SubtaskId -> 
LinkedHashSet[SourceSplits]]].
+       private final SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> 
assignmentsByCheckpointId;
+       // The split assignments since the last checkpoint attempt.
+       // The mapping is [SubtaskId -> LinkedHashSet[SourceSplits]].
+       private Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments;
+
+       public SplitAssignmentTracker() {
+               this.assignmentsByCheckpointId = new TreeMap<>();
+               this.uncheckpointedAssignments = new HashMap<>();
+       }
+
+       /**
+        * Take a snapshot of the uncheckpointed split assignments.
+        *
+        * @param checkpointId the id of the ongoing checkpoint
+        */
+       public void snapshotState(
+                       long checkpointId,
+                       SimpleVersionedSerializer<SplitT> splitSerializer,
+                       DataOutputStream out) throws Exception {
+               // Include the uncheckpointed assignments to the snapshot.
+               assignmentsByCheckpointId.put(checkpointId, 
uncheckpointedAssignments);
+               uncheckpointedAssignments = new HashMap<>();
+               writeAssignmentsByCheckpointId(assignmentsByCheckpointId, 
splitSerializer, out);
+       }
+
+       /**
+        * Restore the state of the SplitAssignmentTracker.
+        *
+        * @param splitSerializer The serializer of the splits.
+        * @param in The ObjectInput that contains the state of the 
SplitAssignmentTracker.
+        * @throws Exception when the state deserialization fails.
+        */
+       @SuppressWarnings("unchecked")
+       public void restoreState(SimpleVersionedSerializer<SplitT> 
splitSerializer, DataInputStream in) throws Exception {
+               // Read the split assignments by checkpoint id.
+               Map<Long, Map<Integer, LinkedHashSet<SplitT>>> 
deserializedAssignments =
+                               readAssignmentsByCheckpointId(in, 
splitSerializer);
+               assignmentsByCheckpointId.putAll(deserializedAssignments);
+       }
+
+       /**
+        * when a checkpoint has been successfully made, this method is invoked 
to clean up the assignment
+        * history before this successful checkpoint.
+        *
+        * @param checkpointId the id of the successful checkpoint.
+        */
+       public void onCheckpointComplete(long checkpointId) {
+               assignmentsByCheckpointId.entrySet().removeIf(entry -> 
entry.getKey() <= checkpointId);
+       }
+
+       /**
+        * Record a new split assignment.
+        *
+        * @param splitsAssignment the new split assignment.
+        */
+       public void recordSplitAssignment(SplitsAssignment<SplitT> 
splitsAssignment) {
+               addSplitAssignment(splitsAssignment, uncheckpointedAssignments);
+       }
+
+       /**
+        * This method is invoked when a source reader fails over. In this 
case, the source reader will
+        * restore its split assignment to the last successful checkpoint. Any 
split assignment to that
+        * source reader after the last successful checkpoint will be lost on 
the source reader side as
+        * if those splits were never assigned. To handle this case, the 
coordinator needs to find those
+        * splits and return them back to the SplitEnumerator for re-assignment.
+        *
+        * @param failedSubtaskId the failed subtask id.
+        * @return A list of splits that needs to be added back to the {@link 
SplitEnumerator}.
+        */
+       public List<SplitT> getAndRemoveUncheckpointedAssignment(int 
failedSubtaskId) {
+               List<SplitT> splits = new ArrayList<>();
+               assignmentsByCheckpointId.values().forEach(assignments -> {
+                       removeFromAssignment(failedSubtaskId, assignments, 
splits);
+               });
+               removeFromAssignment(failedSubtaskId, 
uncheckpointedAssignments, splits);
+               return splits;
+       }
+
+       // ------------- Methods visible for testing ----------------
+
+       @VisibleForTesting
+       SortedMap<Long, Map<Integer, LinkedHashSet<SplitT>>> 
assignmentsByCheckpointId() {
+               return assignmentsByCheckpointId;
+       }
+
+       @VisibleForTesting
+       Map<Integer, LinkedHashSet<SplitT>> assignmentsByCheckpointId(long 
checkpointId) {
+               return assignmentsByCheckpointId.get(checkpointId);
+       }
+
+       @VisibleForTesting
+       Map<Integer, LinkedHashSet<SplitT>> uncheckpointedAssignments() {
+               return uncheckpointedAssignments;
+       }
+
+       // -------------- private helpers ---------------
+
+       private void removeFromAssignment(
+                       int subtaskId,
+                       Map<Integer, LinkedHashSet<SplitT>> assignments,
+                       List<SplitT> toPutBack) {
+               Set<SplitT> splitForSubtask = assignments.remove(subtaskId);
+               if (splitForSubtask != null) {
+                       toPutBack.addAll(splitForSubtask);
+               }
+       }
+
+       private void addSplitAssignment(
+                       SplitsAssignment<SplitT> additionalAssignment,
+                       Map<Integer, LinkedHashSet<SplitT>> assignments) {
+               additionalAssignment.assignment().forEach((id, splits) ->
+                               assignments.computeIfAbsent(id, ignored -> new 
LinkedHashSet<>()).addAll(splits));
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
similarity index 54%
copy from 
flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
index f899b12..5f6cabd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/AddSplitEvent.java
@@ -16,36 +16,30 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
-import java.io.Serializable;
+import java.util.List;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * A source event that adds splits to a source reader.
+ *
+ * @param <SplitT> the type of splits.
  */
-@Public
-public final class ReaderInfo implements Serializable {
-       private final int subtaskId;
-       private final String location;
-
-       public ReaderInfo(int subtaskId, String location) {
-               this.subtaskId = subtaskId;
-               this.location = location;
+public class AddSplitEvent<SplitT> implements OperatorEvent {
+       private final List<SplitT> splits;
+
+       public AddSplitEvent(List<SplitT> splits) {
+               this.splits = splits;
        }
 
-       /**
-        * @return the ID of the subtask that runs the source reader.
-        */
-       public int getSubtaskId() {
-               return subtaskId;
+       public List<SplitT> splits() {
+               return splits;
        }
 
-       /**
-        * @return the location of the subtask that runs this source reader.
-        */
-       public String getLocation() {
-               return location;
+       @Override
+       public String toString() {
+               return String.format("AddSplitEvents[%s]", splits);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
similarity index 61%
copy from 
flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index f899b12..4cb2aeb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -16,36 +16,33 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * An {@link OperatorEvent} that registers a {@link 
org.apache.flink.api.connector.source.SourceReader SourceReader}
+ * to the SourceCoordinator.
  */
-@Public
-public final class ReaderInfo implements Serializable {
+public class ReaderRegistrationEvent implements OperatorEvent {
        private final int subtaskId;
        private final String location;
 
-       public ReaderInfo(int subtaskId, String location) {
+       public ReaderRegistrationEvent(int subtaskId, String location) {
                this.subtaskId = subtaskId;
                this.location = location;
        }
 
-       /**
-        * @return the ID of the subtask that runs the source reader.
-        */
-       public int getSubtaskId() {
+       public int subtaskId() {
                return subtaskId;
        }
 
-       /**
-        * @return the location of the subtask that runs this source reader.
-        */
-       public String getLocation() {
+       public String location() {
                return location;
        }
+
+       @Override
+       public String toString() {
+               return String.format("ReaderRegistrationEvent[subtaskId = %d, 
location = %s)", subtaskId, location);
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
similarity index 54%
copy from 
flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
index f899b12..0a5ffa3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/SourceEventWrapper.java
@@ -16,36 +16,30 @@
  limitations under the License.
  */
 
-package org.apache.flink.api.connector.source;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.annotation.Public;
-
-import java.io.Serializable;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
- * A container class hosting the information of a {@link SourceReader}.
+ * A wrapper operator event that contains a custom defined operator event.
  */
-@Public
-public final class ReaderInfo implements Serializable {
-       private final int subtaskId;
-       private final String location;
-
-       public ReaderInfo(int subtaskId, String location) {
-               this.subtaskId = subtaskId;
-               this.location = location;
+public class SourceEventWrapper implements OperatorEvent {
+       private final SourceEvent sourceEvent;
+
+       public SourceEventWrapper(SourceEvent sourceEvent) {
+               this.sourceEvent = sourceEvent;
        }
 
        /**
-        * @return the ID of the subtask that runs the source reader.
+        * @return The {@link SourceEvent} in this SourceEventWrapper.
         */
-       public int getSubtaskId() {
-               return subtaskId;
+       public SourceEvent getSourceEvent() {
+               return sourceEvent;
        }
 
-       /**
-        * @return the location of the subtask that runs this source reader.
-        */
-       public String getLocation() {
-               return location;
+       @Override
+       public String toString() {
+               return String.format("SourceEventWrapper[%s]", sourceEvent);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
new file mode 100644
index 0000000..7928535
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -0,0 +1,105 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Context {
+       private final OperatorID operatorID;
+       private final int numSubtasks;
+       private final boolean failEventSending;
+
+       private final Map<Integer, List<OperatorEvent>> eventsToOperator;
+       private final LinkedHashMap<Integer, Throwable> failedTasks;
+       private boolean jobFailed;
+
+       public MockOperatorCoordinatorContext(OperatorID operatorID, int 
numSubtasks) {
+               this(operatorID, numSubtasks, true);
+       }
+
+       public MockOperatorCoordinatorContext(OperatorID operatorID, int 
numSubtasks, boolean failEventSending) {
+               this.operatorID = operatorID;
+               this.numSubtasks = numSubtasks;
+               this.eventsToOperator = new HashMap<>();
+               this.failedTasks = new LinkedHashMap<>();
+               this.jobFailed = false;
+               this.failEventSending = failEventSending;
+       }
+
+       @Override
+       public OperatorID getOperatorId() {
+               return operatorID;
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> sendEvent(
+                       OperatorEvent evt,
+                       int targetSubtask) throws TaskNotRunningException {
+               eventsToOperator.computeIfAbsent(targetSubtask, subtaskId -> 
new ArrayList<>()).add(evt);
+               if (failEventSending) {
+                       CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+                       future.completeExceptionally(new 
FlinkRuntimeException("Testing Exception to fail event sending."));
+                       return future;
+               } else {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
+
+       @Override
+       public void failTask(int subtask, Throwable cause) {
+               failedTasks.put(subtask, cause);
+       }
+
+       @Override
+       public void failJob(Throwable cause) {
+               jobFailed = true;
+       }
+
+       @Override
+       public int currentParallelism() {
+               return numSubtasks;
+       }
+
+       // -------------------------------
+
+       public List<OperatorEvent> getEventsToOperatorBySubtaskId(int 
subtaskId) {
+               return eventsToOperator.get(subtaskId);
+       }
+
+       public Map<Integer, List<OperatorEvent>> getEventsToOperator() {
+               return eventsToOperator;
+       }
+
+       public LinkedHashMap<Integer, Throwable> getFailedTasks() {
+               return failedTasks;
+       }
+
+       public boolean isJobFailed() {
+               return jobFailed;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
new file mode 100644
index 0000000..c2d6fb8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/CoordinatorTestUtils.java
@@ -0,0 +1,84 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * A util class containing the helper methods for the coordinator tests.
+ */
+class CoordinatorTestUtils {
+
+       /**
+        * Create a SplitsAssignment. The assignments looks like following:
+        * Subtask 0: Splits {0}
+        * Subtask 1: Splits {1, 2}
+        * Subtask 2: Splits {3, 4, 5}
+        */
+       static SplitsAssignment<MockSourceSplit> getSplitsAssignment(int 
numSubtasks, int startingSplitId) {
+               Map<Integer, List<MockSourceSplit>> assignments = new 
HashMap<>();
+               int splitId = startingSplitId;
+               for (int subtaskIndex = 0; subtaskIndex < numSubtasks; 
subtaskIndex++) {
+                       List<MockSourceSplit> subtaskAssignment = new 
ArrayList<>();
+                       for (int j = 0; j < subtaskIndex + 1; j++) {
+                               subtaskAssignment.add(new 
MockSourceSplit(splitId++));
+                       }
+                       assignments.put(subtaskIndex, subtaskAssignment);
+               }
+               return new SplitsAssignment<>(assignments);
+       }
+
+       /**
+        * Check the actual assignment meets the expectation.
+        */
+       static void verifyAssignment(List<String> expectedSplitIds, 
Collection<MockSourceSplit> actualAssignment) {
+               assertEquals(expectedSplitIds.size(), actualAssignment.size());
+               int i = 0;
+               for (MockSourceSplit split : actualAssignment) {
+                       assertEquals(expectedSplitIds.get(i++), 
split.splitId());
+               }
+       }
+
+       static void verifyException(ThrowingRunnable<Throwable> runnable, 
String failureMessage, String errorMessage) {
+               try {
+                       runnable.run();
+                       fail(failureMessage);
+               } catch (Throwable t) {
+                       Throwable rootCause = t;
+                       while (rootCause.getCause() != null) {
+                               rootCause = rootCause.getCause();
+                       }
+                       assertThat(rootCause.getMessage(), 
Matchers.startsWith(errorMessage));
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
new file mode 100644
index 0000000..ba73ca0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -0,0 +1,188 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for {@link SourceCoordinatorContext}.
+ */
+public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
+
+       @Test
+       public void testRegisterReader() {
+               List<ReaderInfo> readerInfo = registerReaders();
+
+               assertTrue(context.registeredReaders().containsKey(0));
+               assertTrue(context.registeredReaders().containsKey(1));
+               assertEquals(readerInfo.get(0), 
context.registeredReaders().get(0));
+               assertEquals(readerInfo.get(1), 
context.registeredReaders().get(1));
+       }
+
+       @Test
+       public void testUnregisterReader() {
+               List<ReaderInfo> readerInfo = registerReaders();
+               assertEquals(readerInfo.get(0), 
context.registeredReaders().get(0));
+
+               context.unregisterSourceReader(0);
+               assertEquals("Only reader 2 should be registered.", 2, 
context.registeredReaders().size());
+               assertNull(context.registeredReaders().get(0));
+               assertEquals(readerInfo.get(1), 
context.registeredReaders().get(1));
+               assertEquals(readerInfo.get(2), 
context.registeredReaders().get(2));
+       }
+
+       @Test
+       public void testAssignSplitsFromCoordinatorExecutor() throws 
ExecutionException, InterruptedException {
+               testAssignSplits(true);
+       }
+
+       @Test
+       public void testAssignSplitsFromOtherThread() throws 
ExecutionException, InterruptedException {
+               testAssignSplits(false);
+       }
+
+       private void testAssignSplits(boolean fromCoordinatorExecutor) throws 
ExecutionException, InterruptedException {
+               // Register the readers.
+               registerReaders();
+
+               // Assign splits to the readers.
+               SplitsAssignment<MockSourceSplit> splitsAssignment = 
getSplitsAssignment(2, 0);
+               if (fromCoordinatorExecutor) {
+                       coordinatorExecutor.submit(() -> 
context.assignSplits(splitsAssignment)).get();
+               } else {
+                       context.assignSplits(splitsAssignment);
+               }
+
+               // The tracker should have recorded the assignments.
+               verifyAssignment(Arrays.asList("0"), 
splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+               verifyAssignment(Arrays.asList("1", "2"), 
splitSplitAssignmentTracker.uncheckpointedAssignments().get(1));
+               // The OperatorCoordinatorContext should have received the 
event sending call.
+               assertEquals("There should be two events sent to the subtasks.",
+                               2, 
operatorCoordinatorContext.getEventsToOperator().size());
+
+               // Assert the events to subtask0.
+               List<OperatorEvent> eventsToSubtask0 = 
operatorCoordinatorContext.getEventsToOperatorBySubtaskId(0);
+               assertEquals(1, eventsToSubtask0.size());
+               OperatorEvent event = eventsToSubtask0.get(0);
+               assertTrue(event instanceof AddSplitEvent);
+               verifyAssignment(Arrays.asList("0"), ((AddSplitEvent) 
event).splits());
+       }
+
+       @Test
+       public void 
testAssignSplitToUnregisteredReaderFromCoordinatorExecutor() {
+               testAssignSplitToUnregisterdReader(true);
+       }
+
+       @Test
+       public void testAssignSplitToUnregisteredReaderFromOtherThread() {
+               testAssignSplitToUnregisterdReader(false);
+       }
+
+       private void testAssignSplitToUnregisterdReader(boolean 
fromCoordinatorExecutor) {
+               // Assign splits to the readers.
+               SplitsAssignment<MockSourceSplit> splitsAssignment = 
getSplitsAssignment(2, 0);
+               verifyException(
+                               () -> {
+                                       if (fromCoordinatorExecutor) {
+                                               coordinatorExecutor.submit(() 
-> context.assignSplits(splitsAssignment)).get();
+                                       } else {
+                                               
context.assignSplits(splitsAssignment);
+                                       }
+                               },
+                               "assignSpoits() should fail to assign the 
splits to a reader that is not registered.",
+                               "Cannot assign splits");
+       }
+
+       @Test
+       public void testSnapshotAndRestore() throws Exception {
+               registerReaders();
+
+               // Assign splits to the readers.
+               SplitsAssignment<MockSourceSplit> splitsAssignment = 
getSplitsAssignment(2, 0);
+               coordinatorExecutor.submit(() -> 
context.assignSplits(splitsAssignment)).get();
+               // Take the first snapshot;
+               byte[] bytes = takeSnapshot(context, 100L);
+
+               SourceCoordinatorContext<MockSourceSplit> restoredContext;
+               SplitAssignmentTracker<MockSourceSplit> restoredTracker = new 
SplitAssignmentTracker<>();
+               SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
+                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
+                               DataInputStream in = new DataInputStream(bais)) 
{
+                       restoredContext = new SourceCoordinatorContext<>(
+                                       coordinatorExecutor,
+                                       coordinatorThreadFactory,
+                                       1,
+                                       operatorCoordinatorContext,
+                                       restoredTracker);
+                       restoredContext.restoreState(new 
MockSourceSplitSerializer(), in);
+               }
+               assertEquals(context.registeredReaders(), 
restoredContext.registeredReaders());
+               
assertEquals(splitSplitAssignmentTracker.uncheckpointedAssignments(), 
restoredTracker.uncheckpointedAssignments());
+               
assertEquals(splitSplitAssignmentTracker.assignmentsByCheckpointId(), 
restoredTracker.assignmentsByCheckpointId());
+
+       }
+
+       // ------------------------
+
+       private List<ReaderInfo> registerReaders() {
+               // Register the readers.
+               ReaderInfo readerInfo0 = new ReaderInfo(0, 
"subtask_0_location");
+               ReaderInfo readerInfo1 = new ReaderInfo(1, 
"subtask_1_location");
+               ReaderInfo readerInfo2 = new ReaderInfo(2, 
"subtask_1_location");
+               context.registerSourceReader(readerInfo0);
+               context.registerSourceReader(readerInfo1);
+               context.registerSourceReader(readerInfo2);
+               return Arrays.asList(readerInfo0, readerInfo1, readerInfo2);
+       }
+
+       private byte[] takeSnapshot(SourceCoordinatorContext<MockSourceSplit> 
context, long checkpointId) throws Exception {
+               byte[] bytes;
+               try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                               DataOutputStream out = new 
DataOutputViewStreamWrapper(baos)) {
+                       context.snapshotState(checkpointId, new 
MockSourceSplitSerializer(), out);
+                       out.flush();
+                       bytes = baos.toByteArray();
+               }
+               return bytes;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
new file mode 100644
index 0000000..2f12235
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -0,0 +1,214 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
+
+       @Test
+       public void testThrowExceptionWhenNotStarted() {
+               // The following methods should only be invoked after the 
source coordinator has started.
+               String failureMessage = "Call should fail when source 
coordinator has not started yet.";
+               verifyException(() -> 
sourceCoordinator.checkpointComplete(100L),
+                               failureMessage, "The coordinator has not 
started yet.");
+               verifyException(() -> 
sourceCoordinator.handleEventFromOperator(0, null),
+                               failureMessage, "The coordinator has not 
started yet.");
+               verifyException(() -> sourceCoordinator.subtaskFailed(0),
+                               failureMessage, "The coordinator has not 
started yet.");
+               verifyException(() -> 
sourceCoordinator.checkpointCoordinator(100L),
+                               failureMessage, "The coordinator has not 
started yet.");
+       }
+
+       @Test
+       public void testRestCheckpointAfterCoordinatorStarted() throws 
Exception {
+               // The following methods should only be invoked after the 
source coordinator has started.
+               sourceCoordinator.start();
+               verifyException(() -> sourceCoordinator.resetToCheckpoint(null),
+                               "Reset to checkpoint should fail after the 
coordinator has started",
+                               String.format("The coordinator for source %s 
has started. The source coordinator state can " +
+                                               "only be reset to a checkpoint 
before it starts.", OPERATOR_NAME));
+       }
+
+       @Test
+       public void testStart() throws Exception {
+               assertFalse(enumerator.started());
+               sourceCoordinator.start();
+               assertTrue(enumerator.started());
+       }
+
+       @Test
+       public void testClosed() throws Exception {
+               assertFalse(enumerator.closed());
+               sourceCoordinator.start();
+               sourceCoordinator.close();
+               assertTrue(enumerator.closed());
+       }
+
+       @Test
+       public void testReaderRegistration() throws Exception {
+               sourceCoordinator.start();
+               sourceCoordinator.handleEventFromOperator(
+                               0, new ReaderRegistrationEvent(0, 
"location_0"));
+               check(() -> {
+                       assertEquals("2 splits should have been assigned to 
reader 0",
+                                       4, 
enumerator.getUnassignedSplits().size());
+                       assertTrue(context.registeredReaders().containsKey(0));
+                       
assertTrue(enumerator.getHandledSourceEvent().isEmpty());
+                       verifyAssignment(Arrays.asList("0", "3"), 
splitSplitAssignmentTracker.uncheckpointedAssignments().get(0));
+               });
+       }
+
+       @Test
+       public void testHandleSourceEvent() throws Exception {
+               sourceCoordinator.start();
+               SourceEvent sourceEvent = new SourceEvent() {};
+               sourceCoordinator.handleEventFromOperator(0, new 
SourceEventWrapper(sourceEvent));
+               check(() -> {
+                       assertEquals(1, 
enumerator.getHandledSourceEvent().size());
+                       assertEquals(sourceEvent, 
enumerator.getHandledSourceEvent().get(0));
+               });
+       }
+
+       @Test
+       public void testCheckpointCoordinatorAndRestore() throws Exception {
+               sourceCoordinator.start();
+               sourceCoordinator.handleEventFromOperator(
+                               0, new ReaderRegistrationEvent(0, 
"location_0"));
+               byte[] bytes = 
sourceCoordinator.checkpointCoordinator(100L).get();
+
+               // restore from the checkpoints.
+               SourceCoordinator<?, ?> restoredCoordinator = 
getNewSourceCoordinator();
+               restoredCoordinator.resetToCheckpoint(bytes);
+               MockSplitEnumerator restoredEnumerator = (MockSplitEnumerator) 
restoredCoordinator.getEnumerator();
+               SourceCoordinatorContext restoredContext = 
restoredCoordinator.getContext();
+               assertEquals("2 splits should have been assigned to reader 0",
+                               4, 
restoredEnumerator.getUnassignedSplits().size());
+               
assertTrue(restoredEnumerator.getHandledSourceEvent().isEmpty());
+               assertEquals(1, restoredContext.registeredReaders().size());
+               assertTrue(restoredContext.registeredReaders().containsKey(0));
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSubtaskFailedAndRevertUncompletedAssignments() throws 
Exception {
+               sourceCoordinator.start();
+
+               // Assign some splits to reader 0 then take snapshot 100.
+               sourceCoordinator.handleEventFromOperator(
+                               0, new ReaderRegistrationEvent(0, 
"location_0"));
+               sourceCoordinator.checkpointCoordinator(100L).get();
+
+               // Add split 6, assign it to reader 0 and take another snapshot 
101.
+               enumerator.addNewSplits(Collections.singletonList(new 
MockSourceSplit(6)));
+               sourceCoordinator.checkpointCoordinator(101L).get();
+
+               // check the state.
+               check(() -> {
+                       // There should be 4 unassigned splits.
+                       assertEquals(4, 
enumerator.getUnassignedSplits().size());
+                       verifyAssignment(
+                                       Arrays.asList("0", "3"),
+                                       
splitSplitAssignmentTracker.assignmentsByCheckpointId().get(100L).get(0));
+                       
assertTrue(splitSplitAssignmentTracker.uncheckpointedAssignments().isEmpty());
+                       verifyAssignment(Arrays.asList("0", "3"), 
splitSplitAssignmentTracker.assignmentsByCheckpointId(100L).get(0));
+                       verifyAssignment(Arrays.asList("6"), 
splitSplitAssignmentTracker.assignmentsByCheckpointId(101L).get(0));
+
+                       List<OperatorEvent> eventsToReader0 = 
operatorCoordinatorContext.getEventsToOperator().get(0);
+                       assertEquals(2, eventsToReader0.size());
+                       verifyAssignment(Arrays.asList("0", "3"), 
((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(0)).splits());
+                       verifyAssignment(Arrays.asList("6"), 
((AddSplitEvent<MockSourceSplit>) eventsToReader0.get(1)).splits());
+               });
+
+               // Fail reader 0.
+               sourceCoordinator.subtaskFailed(0);
+
+               // check the state again.
+               check(() -> {
+                       //
+                       assertFalse("Reader 0 should have been unregistered.",
+                                       
context.registeredReaders().containsKey(0));
+                       // The tracker should have reverted all the splits 
assignment to reader 0.
+                       for (Map<Integer, ?> assignment : 
splitSplitAssignmentTracker.assignmentsByCheckpointId().values()) {
+                               assertFalse("Assignment in uncompleted 
checkpoint should have been reverted.",
+                                               assignment.containsKey(0));
+                       }
+                       
assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+                       // The split enumerator should now contains the splits 
used to be assigned to reader 0.
+                       assertEquals(7, 
enumerator.getUnassignedSplits().size());
+               });
+       }
+
+       @Test
+       public void testFailedSubtaskDoNotRevertCompletedCheckpoint() throws 
Exception {
+               sourceCoordinator.start();
+
+               // Assign some splits to reader 0 then take snapshot 100.
+               sourceCoordinator.handleEventFromOperator(
+                               0, new ReaderRegistrationEvent(0, 
"location_0"));
+               sourceCoordinator.checkpointCoordinator(100L).get();
+               // Complete checkpoint 100.
+               sourceCoordinator.checkpointComplete(100L);
+
+               // Fail reader 0.
+               sourceCoordinator.subtaskFailed(0);
+
+               check(() -> {
+                       // Reader 0 hase been unregistered.
+                       assertFalse(context.registeredReaders().containsKey(0));
+                       // The assigned splits are not reverted.
+                       assertEquals(4, 
enumerator.getUnassignedSplits().size());
+                       
assertFalse(splitSplitAssignmentTracker.uncheckpointedAssignments().containsKey(0));
+                       
assertTrue(splitSplitAssignmentTracker.assignmentsByCheckpointId().isEmpty());
+               });
+       }
+
+       // -------------------------------
+
+       private void check(Runnable runnable) {
+               try {
+                       coordinatorExecutor.submit(runnable).get();
+               } catch (Exception e) {
+                       fail("Test failed due to " + e);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
new file mode 100644
index 0000000..80a73bb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -0,0 +1,86 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSource;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * The test base for SourceCoordinator related tests.
+ */
+public abstract class SourceCoordinatorTestBase {
+       protected static final String OPERATOR_NAME = "TestOperator";
+       protected static final OperatorID TEST_OPERATOR_ID = new 
OperatorID(1234L, 5678L);
+       protected static final int NUM_SUBTASKS = 3;
+
+       protected ExecutorService coordinatorExecutor;
+       protected MockOperatorCoordinatorContext operatorCoordinatorContext;
+       protected SplitAssignmentTracker<MockSourceSplit> 
splitSplitAssignmentTracker;
+       protected SourceCoordinatorContext<MockSourceSplit> context;
+       protected SourceCoordinator<?, ?> sourceCoordinator;
+       protected MockSplitEnumerator enumerator;
+
+       @Before
+       public void setup() {
+               operatorCoordinatorContext = new 
MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS);
+               splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
+               String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
+               SourceCoordinatorProvider.CoordinatorExecutorThreadFactory 
coordinatorThreadFactory =
+                               new 
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+               coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+               context = new SourceCoordinatorContext<>(
+                               coordinatorExecutor,
+                               coordinatorThreadFactory,
+                               1,
+                               operatorCoordinatorContext,
+                               splitSplitAssignmentTracker);
+               sourceCoordinator = getNewSourceCoordinator();
+               enumerator = (MockSplitEnumerator) 
sourceCoordinator.getEnumerator();
+       }
+
+       @After
+       public void cleanUp() throws InterruptedException, TimeoutException {
+               coordinatorExecutor.shutdown();
+               if (!coordinatorExecutor.awaitTermination(10, 
TimeUnit.SECONDS)) {
+                       throw new TimeoutException("Failed to close the 
CoordinatorExecutor before timeout.");
+               }
+       }
+
+       // --------------------------
+
+       protected SourceCoordinator getNewSourceCoordinator() {
+               Source<Integer, MockSourceSplit, Set<MockSourceSplit>> 
mockSource =
+                               new MockSource(Boundedness.BOUNDED, 
NUM_SUBTASKS * 2);
+               return new SourceCoordinator<>(OPERATOR_NAME, 
coordinatorExecutor, mockSource, context);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
new file mode 100644
index 0000000..86ca76e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SplitAssignmentTrackerTest.java
@@ -0,0 +1,173 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.getSplitsAssignment;
+import static 
org.apache.flink.runtime.source.coordinator.CoordinatorTestUtils.verifyAssignment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for @link {@link SplitAssignmentTracker}.
+ */
+public class SplitAssignmentTrackerTest {
+
+       @Test
+       public void testRecordIncrementalSplitAssignment() {
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+               tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 6));
+
+               verifyAssignment(Arrays.asList("0", "6"), 
tracker.uncheckpointedAssignments().get(0));
+               verifyAssignment(Arrays.asList("1", "2", "7", "8"), 
tracker.uncheckpointedAssignments().get(1));
+               verifyAssignment(Arrays.asList("3", "4", "5"), 
tracker.uncheckpointedAssignments().get(2));
+       }
+
+       @Test
+       public void testTakeSnapshot() throws Exception {
+               final long checkpointId = 123L;
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+               tracker.recordSplitAssignment(getSplitsAssignment(3, 0));
+
+               // Serialize
+               takeSnapshot(tracker, checkpointId);
+
+               // Verify the uncheckpointed assignments.
+               assertTrue(tracker.uncheckpointedAssignments().isEmpty());
+
+               // verify assignments put into the checkpoints.
+               Map<Long, Map<Integer, LinkedHashSet<MockSourceSplit>>> 
assignmentsByCheckpoints =
+                               tracker.assignmentsByCheckpointId();
+               assertEquals(1, assignmentsByCheckpoints.size());
+
+               Map<Integer, LinkedHashSet<MockSourceSplit>> 
assignmentForCheckpoint = assignmentsByCheckpoints.get(checkpointId);
+               assertNotNull(assignmentForCheckpoint);
+
+               verifyAssignment(Arrays.asList("0"), 
assignmentForCheckpoint.get(0));
+               verifyAssignment(Arrays.asList("1", "2"), 
assignmentForCheckpoint.get(1));
+               verifyAssignment(Arrays.asList("3", "4", "5"), 
assignmentForCheckpoint.get(2));
+       }
+
+       @Test
+       public void testRestore() throws Exception {
+               final long checkpointId = 123L;
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+               tracker.recordSplitAssignment(getSplitsAssignment(1, 0));
+
+               // Serialize
+               byte[] bytes = takeSnapshot(tracker, checkpointId);
+
+               // Deserialize
+               SplitAssignmentTracker<MockSourceSplit> deserializedTracker = 
restoreSnapshot(bytes);
+               // Verify the restore was successful.
+               assertEquals(deserializedTracker.assignmentsByCheckpointId(), 
tracker.assignmentsByCheckpointId());
+               assertEquals(deserializedTracker.uncheckpointedAssignments(), 
tracker.uncheckpointedAssignments());
+       }
+
+       @Test
+       public void testOnCheckpointComplete() throws Exception {
+               final long checkpointId1 = 100L;
+               final long checkpointId2 = 101L;
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+
+               // Assign some splits to subtask 0 and 1.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+
+               // Take the first snapshot.
+               takeSnapshot(tracker, checkpointId1);
+               verifyAssignment(Arrays.asList("0"), 
tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+               verifyAssignment(Arrays.asList("1", "2"), 
tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+
+               // Assign additional splits to subtask 0 and 1.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+
+               // Take the second snapshot.
+               takeSnapshot(tracker, checkpointId2);
+               verifyAssignment(Arrays.asList("0"), 
tracker.assignmentsByCheckpointId(checkpointId1).get(0));
+               verifyAssignment(Arrays.asList("1", "2"), 
tracker.assignmentsByCheckpointId(checkpointId1).get(1));
+               verifyAssignment(Arrays.asList("3"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+               verifyAssignment(Arrays.asList("4", "5"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+
+               // Complete the first checkpoint.
+               tracker.onCheckpointComplete(checkpointId1);
+               assertNull(tracker.assignmentsByCheckpointId(checkpointId1));
+               verifyAssignment(Arrays.asList("3"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(0));
+               verifyAssignment(Arrays.asList("4", "5"), 
tracker.assignmentsByCheckpointId(checkpointId2).get(1));
+       }
+
+       @Test
+       public void testGetAndRemoveUncheckpointedAssignment() throws Exception 
{
+               final long checkpointId1 = 100L;
+               final long checkpointId2 = 101L;
+               SplitAssignmentTracker<MockSourceSplit> tracker = new 
SplitAssignmentTracker<>();
+
+               // Assign some splits and take snapshot 1.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 0));
+               takeSnapshot(tracker, checkpointId1);
+
+               // Assign some more splits and take snapshot 2.
+               tracker.recordSplitAssignment(getSplitsAssignment(2, 3));
+               takeSnapshot(tracker, checkpointId2);
+
+               // Now assume subtask 0 has failed.
+               List<MockSourceSplit> splitsToPutBack = 
tracker.getAndRemoveUncheckpointedAssignment(0);
+               verifyAssignment(Arrays.asList("0", "3"), splitsToPutBack);
+       }
+
+       // ---------------------
+
+       private byte[] takeSnapshot(SplitAssignmentTracker<MockSourceSplit> 
tracker, long checkpointId) throws Exception {
+               byte[] bytes;
+               try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                               DataOutputStream out = new 
DataOutputViewStreamWrapper(baos)) {
+                       tracker.snapshotState(checkpointId, new 
MockSourceSplitSerializer(), out);
+                       out.flush();
+                       bytes = baos.toByteArray();
+               }
+               return bytes;
+       }
+
+       private SplitAssignmentTracker<MockSourceSplit> restoreSnapshot(byte[] 
bytes) throws Exception {
+               SplitAssignmentTracker<MockSourceSplit> deserializedTracker;
+               try (ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
+                               DataInputStream in = new 
DataInputViewStreamWrapper(bais)) {
+                       deserializedTracker = new SplitAssignmentTracker<>();
+                       deserializedTracker.restoreState(new 
MockSourceSplitSerializer(), in);
+               }
+               return deserializedTracker;
+       }
+}

Reply via email to