This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fc1ac427fad351b997e4219e3579910781c4fa8
Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com>
AuthorDate: Tue Jun 9 23:35:22 2020 +0800

    [FLINK-10740][docs] Add documentation for FLIP-27 Source API and 
SplitReader API.
---
 docs/dev/stream/sources.md    | 238 +++++++++++++++++++++++++++++++++++++++---
 docs/dev/stream/sources.zh.md | 238 +++++++++++++++++++++++++++++++++++++++---
 docs/fig/source_reader.svg    |  20 ++++
 3 files changed, 472 insertions(+), 24 deletions(-)

diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md
index 0257661..040318e 100644
--- a/docs/dev/stream/sources.md
+++ b/docs/dev/stream/sources.md
@@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic 
regex) and a *Deseriali
 
 Same as above, except that each Split (Topic Partition) has a defined end 
offset. Once the *SourceReader* reaches the end offset for a Split, it finishes 
that Split. Once all assigned Splits are finished, the SourceReader finishes.
 
-----
-----
-
 ## The Data Source API
+This section describes the major interfaces of the new Source API introduced 
in FLIP-27, and provides tips to the developers on the Source development. 
+
+### Source
+The 
[Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java)
 API is a factory style interface to create the following components.
+
+  - *Split Enumerator*
+  - *Source Reader*
+  - *Split Serializer*
+  - *Enumerator Checkpoint Serializer*
+  
+In addition to that, the Source provides the 
[boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java)
 attribute of the source, so that Flink can choose appropriate mode to run the 
Flink jobs.
+
+The Source implementations should be serializable as the Source instances are 
serialized and uploaded to the Flink cluster at runtime.
+
+### SplitEnumerator
+The 
[SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java)
 is expected to be the "brain" of the Source. Typical implementations of the 
`SplitEnumerator` do the following:
+
+  - `SourceReader` registration handling
+  - `SourceReader` failure handling
+    - The `addSplitsBack()` method will be invoked when a `SourceReader` 
fails. The SplitEnumerator should take back the split assignments that have not 
been acknowledged by the failed `SourceReader`.
+  - `SourceEvent` handling
+    - `SourceEvent`s are custom events sent between `SplitEnumerator` and 
`SourceReader`. The implementation can leverage this mechanism to perform 
sophisticated coordination.  
+  - Split discovery and assignment
+    - The `SplitEnumerator` can assign splits to the `SourceReader`s in 
response to various events, including discovery of new splits, new 
`SourceReader` registration, `SourceReader` failure, etc.
+
+A `SplitEnumerator` can accomplish the above work with the help of the 
[SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java)
 which is provided to the `Source` on creation or restore of the 
`SplitEnumerator`. 
+The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary 
information of the readers and perform coordination actions.
+The `Source` implementation is expected to pass the `SplitEnumeratorContext` 
to the `SplitEnumerator` instance. 
+
+While a `SplitEnumerator` implementation can work well in a reactive way by 
only taking coordination actions when its method is invoked, some 
`SplitEnumerator` implementations might want to take actions actively. For 
example, a `SplitEnumerator` may want to periodically run split discovery and 
assign the new splits to the `SourceReaders`. 
+Such implementations may find that the `callAsync()` method 
`SplitEnumeratorContext` is handy. The code snippet below shows how the 
`SplitEnumerator` implementation can achieve that without maintaining its own 
threads.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+class MySplitEnumerator implements SplitEnumerator<MySplit> {
+    private final long DISCOVER_INTERVAL = 60_000L;
+
+    /**
+     * A method to discover the splits.
+     */
+    private List<MySplit> discoverSplits() {...}
+    
+    @Override
+    public void start() {
+        ...
+        enumContext.callAsync(this::discoverSplits, splits -> {
+            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+            int parallelism = enumContext.currentParallelism();
+            for (MockSourceSplit split : splits) {
+                int owner = split.splitId().hashCode() % parallelism;
+                assignments.computeIfAbsent(owner, new 
ArrayList<>()).add(split);
+            }
+            enumContext.assignSplits(new SplitsAssignment<>(assignments));
+        }, 0L, DISCOVER_INTERVAL);
+        ...
+    }
+    ...
+}
+{% endhighlight %}
+</div>
+
+### SourceReader
+
+The 
[SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)
 is a component running in the Task Managers to consume the records from the 
Splits. 
+
+The `SourceReader` exposes a pull-based consumption interface. A Flink task 
keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the 
`SourceReader`. The return value of the `pollNext(ReaderOutput)` method 
indicates the status of the source reader.
 
-Source, SourceEnumerator, SourceReader.
+  - `MORE_AVAILABLE` - The SourceReader has more records available 
intermediately.
+  - `NOTHING_AVAILABLE` - The SourceReader does not have more records 
available at this point, but may have more records in the future.
+  - `END_OF_INPUT` - The SourceReader has exhausted all the records and 
reached the end of data. This means the SourceReader can be closed.
 
-env.continuousSource(...).
+In the interest of performance, a `ReaderOutput` is provided to the 
`pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records 
in a single call of pollNext() if it has to. For example, sometimes the 
external system works at the granularity of blocks. A block may contain 
multiple records but the source can only checkpoint at the block boundaries. In 
this case the `SourceReader` can emit one block at a time to the `ReaderOutput`.
+**However, the `SourceReader` implementation should avoid emitting multiple 
records in a single `pollNext(ReaderOutput)` invocation unless necessary.** 
 
-----
-----
+All the state of a `SourceReader` should be maintained inside the 
`SourceSplit`s which are returned at the `snapshotState()` invocation. Doing 
this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when 
needed.
+
+A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` 
creation. It is expected that the `Source` will pass the context to the 
`SourceReader` instance. The `SourceReader` can send `SourceEvent` to its 
`SplitEnumerator`. A typical design pattern for the `Source` is letting the 
`SourceReader`s report their local information to the `SplitEnumerator` who has 
a global view to make decisions.
+
+The `SourceReader` API is a low level API that allows users to deal with the 
splits manually and have their own threading model to fetch and handover the 
records. To facilitate the `SourceReader` implementation, Flink has provided a 
[SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java)
 class which significantly reduce the amount the work needed to write a `Sou 
[...]
+**It is highly recommended for the connector developers to take advantage of 
the `SourceReaderBase` instead of writing the `SourceReader`s by themselves 
from scratch**. For more details please check the [Split Reader 
API](#the-split-reader-api) section.
+
+### Use the Source
+In order to create a `DataStream` from a `Source`, one needs to pass the 
`Source` to a `StreamExecutionEnvironment`. For example,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+Source mySource = new MySource(...);
+
+DataStream<Integer> stream = env.continuousSource(
+        mySource,
+        WatermarkStrategy.noWatermarks(),
+        "MySourceName");
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val mySource = new MySource(...)
+
+val stream = env.continuousSource(
+      mySource,
+      WatermarkStrategy.noWatermarks(),
+      "MySourceName")
+...
+{% endhighlight %}
+</div>
+</div>
 
 ## The Split Reader API
+Core SourceReader API is asynchronous and rather low level, leaving users to 
handle splits manually.
+In practice, many sources have perform blocking poll() and I/O operations, 
needing separate threads.
+[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java)
 is the API for such high level API.
+The `SplitReader` API is designed work together with `SourceReaderBase` class 
which is a base implementation for the `SourceReader`s. The `SourceReaderBase` 
takes a split reader supplier and creates fetcher threads with various 
consumption threading models.
+
+### SplitReader
+The `SplitReader` API only has three methods:
+  - A blocking fetch method to return a 
[RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
+  - A non-blocking method to handle split changes.
+  - A non-blocking wake up method to wake up the blocking fetch.
+
+The `SplitReader` only focuses on reading the records from the external 
system, therefore is much simpler compared with `SourceReader`.
+Please check the Java doc of the class for more details.
+
+### SourceReaderBase
+It is quite common that a `SourceReader` implementation does the following:
+
+  - Have a pool of threads fetching from splits of the external system in a 
blocking way.
+  - Handle the synchronization between the fetching threads mentioned above 
and the `pollNext(ReaderOutput)` invocation.
+  - Maintain the per split watermark for watermark alignment.
+  - Maintain the state of each split for checkpoint
+  
+In order to reduce the work of writing a new `SourceReader`, Flink provides a 
[SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java)
 class to serve as a base implementation of the `SourceReader`. 
+`SourceReaderBase` has all the above work done out of the box. To write a new 
`SourceReader`, one can just let the `SourceReader` implementation inherit from 
the `SourceReaderBase`, fill in a few methods and implement a high level 
[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
+### SplitFetcherManager
+The `SourceReaderBase` supports a few threading models out of the box, 
depending on the behavior of the 
[SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java)
 it works with.
+The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s 
each fetching with a `SplitReader`. It also determines how to assign splits to 
each split fetcher.
 
-core SourceReader API is asynchronous and rather low level, leavers users to 
handle splits manually.
-in practice, many sources have perform blocking poll() and I/O operations, 
needing separate threads.
-Split Reader is base implementation for that.
+As an example, as illustrated below, a `SplitFetcherManager` may have a fixed 
number of threads, each fetching from some splits assigned to the 
`SourceReader`.
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One 
fetcher per split threading model." />
+</div>
+The following code snippet implements the this threading model.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * A SplitFetcherManager that has a fixed size of split fetchers and assign 
splits 
+ * to the split fetchers based on the hash code of split IDs.
+ */
+public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
+        extends SplitFetcherManager<E, SplitT> {
+    private final int numFetchers;
+
+    public FixedSizeSplitFetcherManager(
+            int numFetchers,
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
+        super(futureNotifier, elementsQueue, splitReaderSupplier);
+        this.numFetchers = numFetchers;
+        // Create numFetchers split fetchers.
+        for (int i = 0; i < numFetchers; i++) {
+            startFetcher(createSplitFetcher());
+        }
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splitsToAdd) {
+        // Group splits by their owner fetchers.
+        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
+        splitsToAdd.forEach(split -> {
+            int ownerFetcherIndex = split.hashCode() % numFetchers;
+            splitsByFetcherIndex
+                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
+                    .add(split);
+        });
+        // Assign the splits to their owner fetcher.
+        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
+            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
+        });
+    }
+}
+{% endhighlight %}
+</div>
+
+And a `SourceReader` using this threading model can be created like following:
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, 
SplitStateT>
+        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+    public FixedFetcherSizeSourceReader(
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(
+                futureNotifier,
+                elementsQueue,
+                new FixedSizeSplitFetcherManager<>(
+                        config.getInteger(SourceConfig.NUM_FETCHERS),
+                        futureNotifier,
+                        elementsQueue,
+                        splitFetcherSupplier),
+                recordEmitter,
+                config,
+                context);
+    }
+
+    @Override
+    protected void onSplitFinished(Collection<String> finishedSplitIds) {
+        // Do something in the callback for the finished splits.
+    }
+
+    @Override
+    protected SplitStateT initializedState(SplitT split) {
+        ...
+    }
+
+    @Override
+    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
+        ...
+    }
+}
+{% endhighlight %}
+</div>
 
-----
-----
+Apparently, the `SourceReader` implementations can also implement their own 
threading model easily on top of the `SplitFetcherManager` and 
`SourceReaderBase`.
 
 ## Event Time and Watermarks
 
diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md
index 0257661..040318e 100644
--- a/docs/dev/stream/sources.zh.md
+++ b/docs/dev/stream/sources.zh.md
@@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic 
regex) and a *Deseriali
 
 Same as above, except that each Split (Topic Partition) has a defined end 
offset. Once the *SourceReader* reaches the end offset for a Split, it finishes 
that Split. Once all assigned Splits are finished, the SourceReader finishes.
 
-----
-----
-
 ## The Data Source API
+This section describes the major interfaces of the new Source API introduced 
in FLIP-27, and provides tips to the developers on the Source development. 
+
+### Source
+The 
[Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java)
 API is a factory style interface to create the following components.
+
+  - *Split Enumerator*
+  - *Source Reader*
+  - *Split Serializer*
+  - *Enumerator Checkpoint Serializer*
+  
+In addition to that, the Source provides the 
[boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java)
 attribute of the source, so that Flink can choose appropriate mode to run the 
Flink jobs.
+
+The Source implementations should be serializable as the Source instances are 
serialized and uploaded to the Flink cluster at runtime.
+
+### SplitEnumerator
+The 
[SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java)
 is expected to be the "brain" of the Source. Typical implementations of the 
`SplitEnumerator` do the following:
+
+  - `SourceReader` registration handling
+  - `SourceReader` failure handling
+    - The `addSplitsBack()` method will be invoked when a `SourceReader` 
fails. The SplitEnumerator should take back the split assignments that have not 
been acknowledged by the failed `SourceReader`.
+  - `SourceEvent` handling
+    - `SourceEvent`s are custom events sent between `SplitEnumerator` and 
`SourceReader`. The implementation can leverage this mechanism to perform 
sophisticated coordination.  
+  - Split discovery and assignment
+    - The `SplitEnumerator` can assign splits to the `SourceReader`s in 
response to various events, including discovery of new splits, new 
`SourceReader` registration, `SourceReader` failure, etc.
+
+A `SplitEnumerator` can accomplish the above work with the help of the 
[SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java)
 which is provided to the `Source` on creation or restore of the 
`SplitEnumerator`. 
+The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary 
information of the readers and perform coordination actions.
+The `Source` implementation is expected to pass the `SplitEnumeratorContext` 
to the `SplitEnumerator` instance. 
+
+While a `SplitEnumerator` implementation can work well in a reactive way by 
only taking coordination actions when its method is invoked, some 
`SplitEnumerator` implementations might want to take actions actively. For 
example, a `SplitEnumerator` may want to periodically run split discovery and 
assign the new splits to the `SourceReaders`. 
+Such implementations may find that the `callAsync()` method 
`SplitEnumeratorContext` is handy. The code snippet below shows how the 
`SplitEnumerator` implementation can achieve that without maintaining its own 
threads.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+class MySplitEnumerator implements SplitEnumerator<MySplit> {
+    private final long DISCOVER_INTERVAL = 60_000L;
+
+    /**
+     * A method to discover the splits.
+     */
+    private List<MySplit> discoverSplits() {...}
+    
+    @Override
+    public void start() {
+        ...
+        enumContext.callAsync(this::discoverSplits, splits -> {
+            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+            int parallelism = enumContext.currentParallelism();
+            for (MockSourceSplit split : splits) {
+                int owner = split.splitId().hashCode() % parallelism;
+                assignments.computeIfAbsent(owner, new 
ArrayList<>()).add(split);
+            }
+            enumContext.assignSplits(new SplitsAssignment<>(assignments));
+        }, 0L, DISCOVER_INTERVAL);
+        ...
+    }
+    ...
+}
+{% endhighlight %}
+</div>
+
+### SourceReader
+
+The 
[SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)
 is a component running in the Task Managers to consume the records from the 
Splits. 
+
+The `SourceReader` exposes a pull-based consumption interface. A Flink task 
keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the 
`SourceReader`. The return value of the `pollNext(ReaderOutput)` method 
indicates the status of the source reader.
 
-Source, SourceEnumerator, SourceReader.
+  - `MORE_AVAILABLE` - The SourceReader has more records available 
intermediately.
+  - `NOTHING_AVAILABLE` - The SourceReader does not have more records 
available at this point, but may have more records in the future.
+  - `END_OF_INPUT` - The SourceReader has exhausted all the records and 
reached the end of data. This means the SourceReader can be closed.
 
-env.continuousSource(...).
+In the interest of performance, a `ReaderOutput` is provided to the 
`pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records 
in a single call of pollNext() if it has to. For example, sometimes the 
external system works at the granularity of blocks. A block may contain 
multiple records but the source can only checkpoint at the block boundaries. In 
this case the `SourceReader` can emit one block at a time to the `ReaderOutput`.
+**However, the `SourceReader` implementation should avoid emitting multiple 
records in a single `pollNext(ReaderOutput)` invocation unless necessary.** 
 
-----
-----
+All the state of a `SourceReader` should be maintained inside the 
`SourceSplit`s which are returned at the `snapshotState()` invocation. Doing 
this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when 
needed.
+
+A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` 
creation. It is expected that the `Source` will pass the context to the 
`SourceReader` instance. The `SourceReader` can send `SourceEvent` to its 
`SplitEnumerator`. A typical design pattern for the `Source` is letting the 
`SourceReader`s report their local information to the `SplitEnumerator` who has 
a global view to make decisions.
+
+The `SourceReader` API is a low level API that allows users to deal with the 
splits manually and have their own threading model to fetch and handover the 
records. To facilitate the `SourceReader` implementation, Flink has provided a 
[SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java)
 class which significantly reduce the amount the work needed to write a `Sou 
[...]
+**It is highly recommended for the connector developers to take advantage of 
the `SourceReaderBase` instead of writing the `SourceReader`s by themselves 
from scratch**. For more details please check the [Split Reader 
API](#the-split-reader-api) section.
+
+### Use the Source
+In order to create a `DataStream` from a `Source`, one needs to pass the 
`Source` to a `StreamExecutionEnvironment`. For example,
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+Source mySource = new MySource(...);
+
+DataStream<Integer> stream = env.continuousSource(
+        mySource,
+        WatermarkStrategy.noWatermarks(),
+        "MySourceName");
+...
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val mySource = new MySource(...)
+
+val stream = env.continuousSource(
+      mySource,
+      WatermarkStrategy.noWatermarks(),
+      "MySourceName")
+...
+{% endhighlight %}
+</div>
+</div>
 
 ## The Split Reader API
+Core SourceReader API is asynchronous and rather low level, leaving users to 
handle splits manually.
+In practice, many sources have perform blocking poll() and I/O operations, 
needing separate threads.
+[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java)
 is the API for such high level API.
+The `SplitReader` API is designed work together with `SourceReaderBase` class 
which is a base implementation for the `SourceReader`s. The `SourceReaderBase` 
takes a split reader supplier and creates fetcher threads with various 
consumption threading models.
+
+### SplitReader
+The `SplitReader` API only has three methods:
+  - A blocking fetch method to return a 
[RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java)
+  - A non-blocking method to handle split changes.
+  - A non-blocking wake up method to wake up the blocking fetch.
+
+The `SplitReader` only focuses on reading the records from the external 
system, therefore is much simpler compared with `SourceReader`.
+Please check the Java doc of the class for more details.
+
+### SourceReaderBase
+It is quite common that a `SourceReader` implementation does the following:
+
+  - Have a pool of threads fetching from splits of the external system in a 
blocking way.
+  - Handle the synchronization between the fetching threads mentioned above 
and the `pollNext(ReaderOutput)` invocation.
+  - Maintain the per split watermark for watermark alignment.
+  - Maintain the state of each split for checkpoint
+  
+In order to reduce the work of writing a new `SourceReader`, Flink provides a 
[SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java)
 class to serve as a base implementation of the `SourceReader`. 
+`SourceReaderBase` has all the above work done out of the box. To write a new 
`SourceReader`, one can just let the `SourceReader` implementation inherit from 
the `SourceReaderBase`, fill in a few methods and implement a high level 
[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java).
 
+### SplitFetcherManager
+The `SourceReaderBase` supports a few threading models out of the box, 
depending on the behavior of the 
[SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java)
 it works with.
+The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s 
each fetching with a `SplitReader`. It also determines how to assign splits to 
each split fetcher.
 
-core SourceReader API is asynchronous and rather low level, leavers users to 
handle splits manually.
-in practice, many sources have perform blocking poll() and I/O operations, 
needing separate threads.
-Split Reader is base implementation for that.
+As an example, as illustrated below, a `SplitFetcherManager` may have a fixed 
number of threads, each fetching from some splits assigned to the 
`SourceReader`.
+<div style="text-align: center">
+  <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One 
fetcher per split threading model." />
+</div>
+The following code snippet implements the this threading model.
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * A SplitFetcherManager that has a fixed size of split fetchers and assign 
splits 
+ * to the split fetchers based on the hash code of split IDs.
+ */
+public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
+        extends SplitFetcherManager<E, SplitT> {
+    private final int numFetchers;
+
+    public FixedSizeSplitFetcherManager(
+            int numFetchers,
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
+        super(futureNotifier, elementsQueue, splitReaderSupplier);
+        this.numFetchers = numFetchers;
+        // Create numFetchers split fetchers.
+        for (int i = 0; i < numFetchers; i++) {
+            startFetcher(createSplitFetcher());
+        }
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splitsToAdd) {
+        // Group splits by their owner fetchers.
+        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
+        splitsToAdd.forEach(split -> {
+            int ownerFetcherIndex = split.hashCode() % numFetchers;
+            splitsByFetcherIndex
+                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
+                    .add(split);
+        });
+        // Assign the splits to their owner fetcher.
+        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
+            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
+        });
+    }
+}
+{% endhighlight %}
+</div>
+
+And a `SourceReader` using this threading model can be created like following:
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, 
SplitStateT>
+        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+    public FixedFetcherSizeSourceReader(
+            FutureNotifier futureNotifier,
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(
+                futureNotifier,
+                elementsQueue,
+                new FixedSizeSplitFetcherManager<>(
+                        config.getInteger(SourceConfig.NUM_FETCHERS),
+                        futureNotifier,
+                        elementsQueue,
+                        splitFetcherSupplier),
+                recordEmitter,
+                config,
+                context);
+    }
+
+    @Override
+    protected void onSplitFinished(Collection<String> finishedSplitIds) {
+        // Do something in the callback for the finished splits.
+    }
+
+    @Override
+    protected SplitStateT initializedState(SplitT split) {
+        ...
+    }
+
+    @Override
+    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
+        ...
+    }
+}
+{% endhighlight %}
+</div>
 
-----
-----
+Apparently, the `SourceReader` implementations can also implement their own 
threading model easily on top of the `SplitFetcherManager` and 
`SourceReaderBase`.
 
 ## Event Time and Watermarks
 
diff --git a/docs/fig/source_reader.svg b/docs/fig/source_reader.svg
new file mode 100644
index 0000000..1d4d9ac
--- /dev/null
+++ b/docs/fig/source_reader.svg
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<svg width="1257" height="653" xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink"; overflow="hidden"><defs><clipPath 
id="clip0"><path d="M-11 41 1246 41 1246 694-11 694Z" fill-rule="evenodd" 
clip-rule="evenodd"/></clipPath><clipPath id="clip1"><path d="M1017 309 1130 
309 1130 420 1017 420Z" fill-rule="evenodd" 
clip-rule="evenodd"/></clipPath><clipPath id="clip2"><path d="M1017 309 1130 
309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath>< 
[...]
\ No newline at end of file

Reply via email to