dawidwys commented on a change in pull request #15526:
URL: https://github.com/apache/flink/pull/15526#discussion_r609847462



##########
File path: docs/content.zh/docs/dev/datastream/sources.md
##########
@@ -0,0 +1,386 @@
+---
+title: "Data Sources"
+weight: 11
+type: docs
+aliases:
+  - /zh/dev/stream/sources.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Data Sources
+
+
+{{< hint warning >}}
+**Note**: This describes the new Data Source API, introduced in Flink 1.11 as 
part of 
[FLIP-27]("https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";).
+This new API is currently in **BETA**  status.
+
+ Most of the existing source connectors are not yet (as of Flink 1.11) 
implemented using this new API,
+ but using the previous API, based on 
[SourceFunction]("https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java";).
+{{< /hint >}}
+
+This page describes Flink's Data Source API and the concepts and architecture 
behind it.
+**Read this, if you are interested in how data sources in Flink work, or if 
you want to implement a new Data Source.**
+
+If you are looking for pre-defined source connectors, please check the 
[Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}).
+
+
+## Data Source Concepts
+
+**Core Components**
+
+A Data Source has three core components: *Splits*, the *SplitEnumerator*, and 
the *SourceReader*.
+
+  - A **Split** is a portion of data consumed by the source, like a file or a 
log partition. Splits are granularity by which the source distributes the work 
and parallelizes the data reading.
+
+  - The **SourceReader** requests *Splits* and processes them, for example by 
reading the file or log partition represented by the *Split*. The 
*SourceReader* run in parallel on the Task Managers in the `SourceOperators` 
and produces the parallel stream of events/records.
+
+  - The **SplitEnumerator** generates the *Splits* and assigns them to the 
*SourceReaders*. It runs as a single instance on the Job Manager and is 
responsible for maintaining the backlog of pending *Splits* and assigning them 
to the readers in a balanced manner.
+  
+The 
[Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java)
 class is API entry point that ties the above three components together.
+
+{{< img src="fig/source_components.svg" alt="Illustration of SplitEnumerator 
and SourceReader interacting" width="70%" >}}
+
+
+**Unified Across Streaming and Batch**
+
+The Data Source API supports both unbounded streaming sources and bounded 
batch sources, in a unified way.
+
+The difference between both cases is minimal: In the bounded/batch case, the 
enumerator generates a fix set of splits, and each split is necessarily finite. 
In the unbounded streaming case, one of the two is not true (splits are not 
finite, or the enumerator keep generating new splits).
+
+#### Examples
+
+Here are some simplified conceptual examples to illustrate how the data source 
components interact, in streaming and batch cases.
+
+*Note that this does not the accurately describe how the Kafka and File source 
implementations work; parts are simplified, for illustrative purposes.*
+
+**Bounded File Source**
+
+The source has the URI/Path of a directory to read, and a *Format* that 
defines how to parse the files.
+
+  - A *Split* is a file, or a region of a file (if the data format supports 
splitting the file).
+  - The *SplitEnumerator* lists all files under the given directory path. It 
assigns Splits to the next reader that requests a Split. Once all Splits are 
assigned, it responds to requests with *NoMoreSplits*.
+  - The *SourceReader* requests a Split and reads the assigned Split (file or 
file region) and parses it using the given Format. If it does not get another 
Split, but a *NoMoreSplits* message, it finishes.
+
+**Unbounded Streaming File Source**
+
+This source works the same way as described above, except that the 
*SplitEnumerator* never responds with *NoMoreSplits* and periodically lists the 
contents under the given URI/Path to check for new files. Once it finds new 
files, it generates new Splits for them and can assign them to the available 
SourceReaders.
+
+**Unbounded Streaming Kafka Source**
+
+The source has a Kafka Topic (or list of Topics or Topic regex) and a 
*Deserializer* to parse the records.
+
+  - A *Split* is a Kafka Topic Partition.
+  - The *SplitEnumerator* connects to the brokers to list all topic partitions 
involved in the subscribed topics. The enumerator can optionally repeat this 
operation to discover newly added topics/partitions.
+  - The *SourceReader* reads the assigned Splits (Topic Partitions) using the 
KafkaConsumer and deserializes the records using the provided Deserializer. The 
splits (Topic Partitions) do not have an end, so the reader never reaches the 
end of the data.
+
+**Bounded Kafka Source**
+
+Same as above, except that each Split (Topic Partition) has a defined end 
offset. Once the *SourceReader* reaches the end offset for a Split, it finishes 
that Split. Once all assigned Splits are finished, the SourceReader finishes.
+
+## The Data Source API
+This section describes the major interfaces of the new Source API introduced 
in FLIP-27, and provides tips to the developers on the Source development. 
+
+### Source
+The 
[Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java)
 API is a factory style interface to create the following components.
+
+  - *Split Enumerator*
+  - *Source Reader*
+  - *Split Serializer*
+  - *Enumerator Checkpoint Serializer*
+  
+In addition to that, the Source provides the 
[boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java)
 attribute of the source, so that Flink can choose appropriate mode to run the 
Flink jobs.
+
+The Source implementations should be serializable as the Source instances are 
serialized and uploaded to the Flink cluster at runtime.
+
+### SplitEnumerator
+The 
[SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java)
 is expected to be the "brain" of the Source. Typical implementations of the 
`SplitEnumerator` do the following:
+
+  - `SourceReader` registration handling
+  - `SourceReader` failure handling
+    - The `addSplitsBack()` method will be invoked when a `SourceReader` 
fails. The SplitEnumerator should take back the split assignments that have not 
been acknowledged by the failed `SourceReader`.
+  - `SourceEvent` handling
+    - `SourceEvent`s are custom events sent between `SplitEnumerator` and 
`SourceReader`. The implementation can leverage this mechanism to perform 
sophisticated coordination.  
+  - Split discovery and assignment
+    - The `SplitEnumerator` can assign splits to the `SourceReader`s in 
response to various events, including discovery of new splits, new 
`SourceReader` registration, `SourceReader` failure, etc.
+
+A `SplitEnumerator` can accomplish the above work with the help of the 
[SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java)
 which is provided to the `Source` on creation or restore of the 
`SplitEnumerator`. 
+The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary 
information of the readers and perform coordination actions.
+The `Source` implementation is expected to pass the `SplitEnumeratorContext` 
to the `SplitEnumerator` instance. 
+
+While a `SplitEnumerator` implementation can work well in a reactive way by 
only taking coordination actions when its method is invoked, some 
`SplitEnumerator` implementations might want to take actions actively. For 
example, a `SplitEnumerator` may want to periodically run split discovery and 
assign the new splits to the `SourceReaders`. 
+Such implementations may find that the `callAsync()` method 
`SplitEnumeratorContext` is handy. The code snippet below shows how the 
`SplitEnumerator` implementation can achieve that without maintaining its own 
threads.
+
+```java
+class MySplitEnumerator implements SplitEnumerator<MySplit> {
+    private final long DISCOVER_INTERVAL = 60_000L;
+
+    /**
+     * A method to discover the splits.
+     */
+    private List<MySplit> discoverSplits() {...}
+    
+    @Override
+    public void start() {
+        ...
+        enumContext.callAsync(this::discoverSplits, splits -> {
+            Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>();
+            int parallelism = enumContext.currentParallelism();
+            for (MockSourceSplit split : splits) {
+                int owner = split.splitId().hashCode() % parallelism;
+                assignments.computeIfAbsent(owner, new 
ArrayList<>()).add(split);
+            }
+            enumContext.assignSplits(new SplitsAssignment<>(assignments));
+        }, 0L, DISCOVER_INTERVAL);
+        ...
+    }
+    ...
+}
+```
+
+### SourceReader
+
+The 
[SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java)
 is a component running in the Task Managers to consume the records from the 
Splits. 
+
+The `SourceReader` exposes a pull-based consumption interface. A Flink task 
keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the 
`SourceReader`. The return value of the `pollNext(ReaderOutput)` method 
indicates the status of the source reader.
+
+  - `MORE_AVAILABLE` - The SourceReader has more records available immediately.
+  - `NOTHING_AVAILABLE` - The SourceReader does not have more records 
available at this point, but may have more records in the future.
+  - `END_OF_INPUT` - The SourceReader has exhausted all the records and 
reached the end of data. This means the SourceReader can be closed.
+
+In the interest of performance, a `ReaderOutput` is provided to the 
`pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records 
in a single call of pollNext() if it has to. For example, sometimes the 
external system works at the granularity of blocks. A block may contain 
multiple records but the source can only checkpoint at the block boundaries. In 
this case the `SourceReader` can emit all the records in one block at a time to 
the `ReaderOutput`.
+**However, the `SourceReader` implementation should avoid emitting multiple 
records in a single `pollNext(ReaderOutput)` invocation unless necessary.** 
This is because the task thread that is polling from the `SourceReader` works 
in an event-loop and cannot block.
+
+All the state of a `SourceReader` should be maintained inside the 
`SourceSplit`s which are returned at the `snapshotState()` invocation. Doing 
this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when 
needed.
+
+A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` 
creation. It is expected that the `Source` will pass the context to the 
`SourceReader` instance. The `SourceReader` can send `SourceEvent` to its 
`SplitEnumerator` through the `SourceReaderContext`. A typical design pattern 
of the `Source` is letting the `SourceReader`s report their local information 
to the `SplitEnumerator` who has a global view to make decisions.
+
+The `SourceReader` API is a low level API that allows users to deal with the 
splits manually and have their own threading model to fetch and handover the 
records. To facilitate the `SourceReader` implementation, Flink has provided a 
[SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java)
 class which significantly reduces the amount the work needed to write a 
`SourceReader`.
+**It is highly recommended for the connector developers to take advantage of 
the `SourceReaderBase` instead of writing the `SourceReader`s from scratch**. 
For more details please check the [Split Reader API](#the-split-reader-api) 
section.
+
+### Use the Source
+In order to create a `DataStream` from a `Source`, one needs to pass the 
`Source` to a `StreamExecutionEnvironment`. For example,
+
+{{< tabs "bde5ff60-4e61-4633-a6dc-50413cfd7b45" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+Source mySource = new MySource(...);
+
+DataStream<Integer> stream = env.fromSource(
+        mySource,
+        WatermarkStrategy.noWatermarks(),
+        "MySourceName");
+...
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val mySource = new MySource(...)
+
+val stream = env.fromSource(
+      mySource,
+      WatermarkStrategy.noWatermarks(),
+      "MySourceName")
+...
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+----
+----

Review comment:
       nit: I think we can remove one line. It looks a bit weird and I see no 
purpose for double line in this place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to