This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9fc1ac427fad351b997e4219e3579910781c4fa8 Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com> AuthorDate: Tue Jun 9 23:35:22 2020 +0800 [FLINK-10740][docs] Add documentation for FLIP-27 Source API and SplitReader API. --- docs/dev/stream/sources.md | 238 +++++++++++++++++++++++++++++++++++++++--- docs/dev/stream/sources.zh.md | 238 +++++++++++++++++++++++++++++++++++++++--- docs/fig/source_reader.svg | 20 ++++ 3 files changed, 472 insertions(+), 24 deletions(-) diff --git a/docs/dev/stream/sources.md b/docs/dev/stream/sources.md index 0257661..040318e 100644 --- a/docs/dev/stream/sources.md +++ b/docs/dev/stream/sources.md @@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deseriali Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes. ----- ----- - ## The Data Source API +This section describes the major interfaces of the new Source API introduced in FLIP-27, and provides tips to the developers on the Source development. + +### Source +The [Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java) API is a factory style interface to create the following components. + + - *Split Enumerator* + - *Source Reader* + - *Split Serializer* + - *Enumerator Checkpoint Serializer* + +In addition to that, the Source provides the [boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java) attribute of the source, so that Flink can choose appropriate mode to run the Flink jobs. + +The Source implementations should be serializable as the Source instances are serialized and uploaded to the Flink cluster at runtime. + +### SplitEnumerator +The [SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java) is expected to be the "brain" of the Source. Typical implementations of the `SplitEnumerator` do the following: + + - `SourceReader` registration handling + - `SourceReader` failure handling + - The `addSplitsBack()` method will be invoked when a `SourceReader` fails. The SplitEnumerator should take back the split assignments that have not been acknowledged by the failed `SourceReader`. + - `SourceEvent` handling + - `SourceEvent`s are custom events sent between `SplitEnumerator` and `SourceReader`. The implementation can leverage this mechanism to perform sophisticated coordination. + - Split discovery and assignment + - The `SplitEnumerator` can assign splits to the `SourceReader`s in response to various events, including discovery of new splits, new `SourceReader` registration, `SourceReader` failure, etc. + +A `SplitEnumerator` can accomplish the above work with the help of the [SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java) which is provided to the `Source` on creation or restore of the `SplitEnumerator`. +The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary information of the readers and perform coordination actions. +The `Source` implementation is expected to pass the `SplitEnumeratorContext` to the `SplitEnumerator` instance. + +While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`. +Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads. +<div data-lang="java" markdown="1"> +{% highlight java %} +class MySplitEnumerator implements SplitEnumerator<MySplit> { + private final long DISCOVER_INTERVAL = 60_000L; + + /** + * A method to discover the splits. + */ + private List<MySplit> discoverSplits() {...} + + @Override + public void start() { + ... + enumContext.callAsync(this::discoverSplits, splits -> { + Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>(); + int parallelism = enumContext.currentParallelism(); + for (MockSourceSplit split : splits) { + int owner = split.splitId().hashCode() % parallelism; + assignments.computeIfAbsent(owner, new ArrayList<>()).add(split); + } + enumContext.assignSplits(new SplitsAssignment<>(assignments)); + }, 0L, DISCOVER_INTERVAL); + ... + } + ... +} +{% endhighlight %} +</div> + +### SourceReader + +The [SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java) is a component running in the Task Managers to consume the records from the Splits. + +The `SourceReader` exposes a pull-based consumption interface. A Flink task keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the `SourceReader`. The return value of the `pollNext(ReaderOutput)` method indicates the status of the source reader. -Source, SourceEnumerator, SourceReader. + - `MORE_AVAILABLE` - The SourceReader has more records available intermediately. + - `NOTHING_AVAILABLE` - The SourceReader does not have more records available at this point, but may have more records in the future. + - `END_OF_INPUT` - The SourceReader has exhausted all the records and reached the end of data. This means the SourceReader can be closed. -env.continuousSource(...). +In the interest of performance, a `ReaderOutput` is provided to the `pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records in a single call of pollNext() if it has to. For example, sometimes the external system works at the granularity of blocks. A block may contain multiple records but the source can only checkpoint at the block boundaries. In this case the `SourceReader` can emit one block at a time to the `ReaderOutput`. +**However, the `SourceReader` implementation should avoid emitting multiple records in a single `pollNext(ReaderOutput)` invocation unless necessary.** ----- ----- +All the state of a `SourceReader` should be maintained inside the `SourceSplit`s which are returned at the `snapshotState()` invocation. Doing this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when needed. + +A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` creation. It is expected that the `Source` will pass the context to the `SourceReader` instance. The `SourceReader` can send `SourceEvent` to its `SplitEnumerator`. A typical design pattern for the `Source` is letting the `SourceReader`s report their local information to the `SplitEnumerator` who has a global view to make decisions. + +The `SourceReader` API is a low level API that allows users to deal with the splits manually and have their own threading model to fetch and handover the records. To facilitate the `SourceReader` implementation, Flink has provided a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class which significantly reduce the amount the work needed to write a `Sou [...] +**It is highly recommended for the connector developers to take advantage of the `SourceReaderBase` instead of writing the `SourceReader`s by themselves from scratch**. For more details please check the [Split Reader API](#the-split-reader-api) section. + +### Use the Source +In order to create a `DataStream` from a `Source`, one needs to pass the `Source` to a `StreamExecutionEnvironment`. For example, + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +Source mySource = new MySource(...); + +DataStream<Integer> stream = env.continuousSource( + mySource, + WatermarkStrategy.noWatermarks(), + "MySourceName"); +... +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val mySource = new MySource(...) + +val stream = env.continuousSource( + mySource, + WatermarkStrategy.noWatermarks(), + "MySourceName") +... +{% endhighlight %} +</div> +</div> ## The Split Reader API +Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually. +In practice, many sources have perform blocking poll() and I/O operations, needing separate threads. +[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API. +The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models. + +### SplitReader +The `SplitReader` API only has three methods: + - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java) + - A non-blocking method to handle split changes. + - A non-blocking wake up method to wake up the blocking fetch. + +The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`. +Please check the Java doc of the class for more details. + +### SourceReaderBase +It is quite common that a `SourceReader` implementation does the following: + + - Have a pool of threads fetching from splits of the external system in a blocking way. + - Handle the synchronization between the fetching threads mentioned above and the `pollNext(ReaderOutput)` invocation. + - Maintain the per split watermark for watermark alignment. + - Maintain the state of each split for checkpoint + +In order to reduce the work of writing a new `SourceReader`, Flink provides a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class to serve as a base implementation of the `SourceReader`. +`SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java). +### SplitFetcherManager +The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with. +The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher. -core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually. -in practice, many sources have perform blocking poll() and I/O operations, needing separate threads. -Split Reader is base implementation for that. +As an example, as illustrated below, a `SplitFetcherManager` may have a fixed number of threads, each fetching from some splits assigned to the `SourceReader`. +<div style="text-align: center"> + <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One fetcher per split threading model." /> +</div> +The following code snippet implements the this threading model. +<div data-lang="java" markdown="1"> +{% highlight java %} +/** + * A SplitFetcherManager that has a fixed size of split fetchers and assign splits + * to the split fetchers based on the hash code of split IDs. + */ +public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> + extends SplitFetcherManager<E, SplitT> { + private final int numFetchers; + + public FixedSizeSplitFetcherManager( + int numFetchers, + FutureNotifier futureNotifier, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { + super(futureNotifier, elementsQueue, splitReaderSupplier); + this.numFetchers = numFetchers; + // Create numFetchers split fetchers. + for (int i = 0; i < numFetchers; i++) { + startFetcher(createSplitFetcher()); + } + } + + @Override + public void addSplits(List<SplitT> splitsToAdd) { + // Group splits by their owner fetchers. + Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>(); + splitsToAdd.forEach(split -> { + int ownerFetcherIndex = split.hashCode() % numFetchers; + splitsByFetcherIndex + .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>()) + .add(split); + }); + // Assign the splits to their owner fetcher. + splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> { + fetchers.get(fetcherIndex).addSplits(splitsForFetcher); + }); + } +} +{% endhighlight %} +</div> + +And a `SourceReader` using this threading model can be created like following: +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT> + extends SourceReaderBase<E, T, SplitT, SplitStateT> { + + public FixedFetcherSizeSourceReader( + FutureNotifier futureNotifier, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitFetcherSupplier, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + futureNotifier, + elementsQueue, + new FixedSizeSplitFetcherManager<>( + config.getInteger(SourceConfig.NUM_FETCHERS), + futureNotifier, + elementsQueue, + splitFetcherSupplier), + recordEmitter, + config, + context); + } + + @Override + protected void onSplitFinished(Collection<String> finishedSplitIds) { + // Do something in the callback for the finished splits. + } + + @Override + protected SplitStateT initializedState(SplitT split) { + ... + } + + @Override + protected SplitT toSplitType(String splitId, SplitStateT splitState) { + ... + } +} +{% endhighlight %} +</div> ----- ----- +Apparently, the `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`. ## Event Time and Watermarks diff --git a/docs/dev/stream/sources.zh.md b/docs/dev/stream/sources.zh.md index 0257661..040318e 100644 --- a/docs/dev/stream/sources.zh.md +++ b/docs/dev/stream/sources.zh.md @@ -96,27 +96,241 @@ The source has a Kafka Topic (or list of Topics or Topic regex) and a *Deseriali Same as above, except that each Split (Topic Partition) has a defined end offset. Once the *SourceReader* reaches the end offset for a Split, it finishes that Split. Once all assigned Splits are finished, the SourceReader finishes. ----- ----- - ## The Data Source API +This section describes the major interfaces of the new Source API introduced in FLIP-27, and provides tips to the developers on the Source development. + +### Source +The [Source](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java) API is a factory style interface to create the following components. + + - *Split Enumerator* + - *Source Reader* + - *Split Serializer* + - *Enumerator Checkpoint Serializer* + +In addition to that, the Source provides the [boundedness](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/Boundedness.java) attribute of the source, so that Flink can choose appropriate mode to run the Flink jobs. + +The Source implementations should be serializable as the Source instances are serialized and uploaded to the Flink cluster at runtime. + +### SplitEnumerator +The [SplitEnumerator](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java) is expected to be the "brain" of the Source. Typical implementations of the `SplitEnumerator` do the following: + + - `SourceReader` registration handling + - `SourceReader` failure handling + - The `addSplitsBack()` method will be invoked when a `SourceReader` fails. The SplitEnumerator should take back the split assignments that have not been acknowledged by the failed `SourceReader`. + - `SourceEvent` handling + - `SourceEvent`s are custom events sent between `SplitEnumerator` and `SourceReader`. The implementation can leverage this mechanism to perform sophisticated coordination. + - Split discovery and assignment + - The `SplitEnumerator` can assign splits to the `SourceReader`s in response to various events, including discovery of new splits, new `SourceReader` registration, `SourceReader` failure, etc. + +A `SplitEnumerator` can accomplish the above work with the help of the [SplitEnumeratorContext](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java) which is provided to the `Source` on creation or restore of the `SplitEnumerator`. +The `SplitEnumeratorContext` allows a `SplitEnumerator` to retrieve necessary information of the readers and perform coordination actions. +The `Source` implementation is expected to pass the `SplitEnumeratorContext` to the `SplitEnumerator` instance. + +While a `SplitEnumerator` implementation can work well in a reactive way by only taking coordination actions when its method is invoked, some `SplitEnumerator` implementations might want to take actions actively. For example, a `SplitEnumerator` may want to periodically run split discovery and assign the new splits to the `SourceReaders`. +Such implementations may find that the `callAsync()` method `SplitEnumeratorContext` is handy. The code snippet below shows how the `SplitEnumerator` implementation can achieve that without maintaining its own threads. +<div data-lang="java" markdown="1"> +{% highlight java %} +class MySplitEnumerator implements SplitEnumerator<MySplit> { + private final long DISCOVER_INTERVAL = 60_000L; + + /** + * A method to discover the splits. + */ + private List<MySplit> discoverSplits() {...} + + @Override + public void start() { + ... + enumContext.callAsync(this::discoverSplits, splits -> { + Map<Integer, List<MockSourceSplit>> assignments = new HashMap<>(); + int parallelism = enumContext.currentParallelism(); + for (MockSourceSplit split : splits) { + int owner = split.splitId().hashCode() % parallelism; + assignments.computeIfAbsent(owner, new ArrayList<>()).add(split); + } + enumContext.assignSplits(new SplitsAssignment<>(assignments)); + }, 0L, DISCOVER_INTERVAL); + ... + } + ... +} +{% endhighlight %} +</div> + +### SourceReader + +The [SourceReader](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java) is a component running in the Task Managers to consume the records from the Splits. + +The `SourceReader` exposes a pull-based consumption interface. A Flink task keeps calling `pollNext(ReaderOutput)` in a loop to poll records from the `SourceReader`. The return value of the `pollNext(ReaderOutput)` method indicates the status of the source reader. -Source, SourceEnumerator, SourceReader. + - `MORE_AVAILABLE` - The SourceReader has more records available intermediately. + - `NOTHING_AVAILABLE` - The SourceReader does not have more records available at this point, but may have more records in the future. + - `END_OF_INPUT` - The SourceReader has exhausted all the records and reached the end of data. This means the SourceReader can be closed. -env.continuousSource(...). +In the interest of performance, a `ReaderOutput` is provided to the `pollNext(ReaderOutput)` method, so a `SourceReader` can emit multiple records in a single call of pollNext() if it has to. For example, sometimes the external system works at the granularity of blocks. A block may contain multiple records but the source can only checkpoint at the block boundaries. In this case the `SourceReader` can emit one block at a time to the `ReaderOutput`. +**However, the `SourceReader` implementation should avoid emitting multiple records in a single `pollNext(ReaderOutput)` invocation unless necessary.** ----- ----- +All the state of a `SourceReader` should be maintained inside the `SourceSplit`s which are returned at the `snapshotState()` invocation. Doing this allows the `SourceSplit`s to be reassigned to other `SourceReaders` when needed. + +A `SourceReaderContext` is provided to the `Source` upon a `SourceReader` creation. It is expected that the `Source` will pass the context to the `SourceReader` instance. The `SourceReader` can send `SourceEvent` to its `SplitEnumerator`. A typical design pattern for the `Source` is letting the `SourceReader`s report their local information to the `SplitEnumerator` who has a global view to make decisions. + +The `SourceReader` API is a low level API that allows users to deal with the splits manually and have their own threading model to fetch and handover the records. To facilitate the `SourceReader` implementation, Flink has provided a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class which significantly reduce the amount the work needed to write a `Sou [...] +**It is highly recommended for the connector developers to take advantage of the `SourceReaderBase` instead of writing the `SourceReader`s by themselves from scratch**. For more details please check the [Split Reader API](#the-split-reader-api) section. + +### Use the Source +In order to create a `DataStream` from a `Source`, one needs to pass the `Source` to a `StreamExecutionEnvironment`. For example, + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +Source mySource = new MySource(...); + +DataStream<Integer> stream = env.continuousSource( + mySource, + WatermarkStrategy.noWatermarks(), + "MySourceName"); +... +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +val mySource = new MySource(...) + +val stream = env.continuousSource( + mySource, + WatermarkStrategy.noWatermarks(), + "MySourceName") +... +{% endhighlight %} +</div> +</div> ## The Split Reader API +Core SourceReader API is asynchronous and rather low level, leaving users to handle splits manually. +In practice, many sources have perform blocking poll() and I/O operations, needing separate threads. +[SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java) is the API for such high level API. +The `SplitReader` API is designed work together with `SourceReaderBase` class which is a base implementation for the `SourceReader`s. The `SourceReaderBase` takes a split reader supplier and creates fetcher threads with various consumption threading models. + +### SplitReader +The `SplitReader` API only has three methods: + - A blocking fetch method to return a [RecordsWithSplitIds](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java) + - A non-blocking method to handle split changes. + - A non-blocking wake up method to wake up the blocking fetch. + +The `SplitReader` only focuses on reading the records from the external system, therefore is much simpler compared with `SourceReader`. +Please check the Java doc of the class for more details. + +### SourceReaderBase +It is quite common that a `SourceReader` implementation does the following: + + - Have a pool of threads fetching from splits of the external system in a blocking way. + - Handle the synchronization between the fetching threads mentioned above and the `pollNext(ReaderOutput)` invocation. + - Maintain the per split watermark for watermark alignment. + - Maintain the state of each split for checkpoint + +In order to reduce the work of writing a new `SourceReader`, Flink provides a [SourceReaderBase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java) class to serve as a base implementation of the `SourceReader`. +`SourceReaderBase` has all the above work done out of the box. To write a new `SourceReader`, one can just let the `SourceReader` implementation inherit from the `SourceReaderBase`, fill in a few methods and implement a high level [SplitReader](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java). +### SplitFetcherManager +The `SourceReaderBase` supports a few threading models out of the box, depending on the behavior of the [SplitFetcherManager](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java) it works with. +The `SplitFetcherManager` helps create and maintain a pool of `SplitFetcher`s each fetching with a `SplitReader`. It also determines how to assign splits to each split fetcher. -core SourceReader API is asynchronous and rather low level, leavers users to handle splits manually. -in practice, many sources have perform blocking poll() and I/O operations, needing separate threads. -Split Reader is base implementation for that. +As an example, as illustrated below, a `SplitFetcherManager` may have a fixed number of threads, each fetching from some splits assigned to the `SourceReader`. +<div style="text-align: center"> + <img width="70%" src="{{ site.baseurl }}/fig/source_reader.svg" alt="One fetcher per split threading model." /> +</div> +The following code snippet implements the this threading model. +<div data-lang="java" markdown="1"> +{% highlight java %} +/** + * A SplitFetcherManager that has a fixed size of split fetchers and assign splits + * to the split fetchers based on the hash code of split IDs. + */ +public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> + extends SplitFetcherManager<E, SplitT> { + private final int numFetchers; + + public FixedSizeSplitFetcherManager( + int numFetchers, + FutureNotifier futureNotifier, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { + super(futureNotifier, elementsQueue, splitReaderSupplier); + this.numFetchers = numFetchers; + // Create numFetchers split fetchers. + for (int i = 0; i < numFetchers; i++) { + startFetcher(createSplitFetcher()); + } + } + + @Override + public void addSplits(List<SplitT> splitsToAdd) { + // Group splits by their owner fetchers. + Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>(); + splitsToAdd.forEach(split -> { + int ownerFetcherIndex = split.hashCode() % numFetchers; + splitsByFetcherIndex + .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>()) + .add(split); + }); + // Assign the splits to their owner fetcher. + splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> { + fetchers.get(fetcherIndex).addSplits(splitsForFetcher); + }); + } +} +{% endhighlight %} +</div> + +And a `SourceReader` using this threading model can be created like following: +<div data-lang="java" markdown="1"> +{% highlight java %} +public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT> + extends SourceReaderBase<E, T, SplitT, SplitStateT> { + + public FixedFetcherSizeSourceReader( + FutureNotifier futureNotifier, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Supplier<SplitReader<E, SplitT>> splitFetcherSupplier, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + futureNotifier, + elementsQueue, + new FixedSizeSplitFetcherManager<>( + config.getInteger(SourceConfig.NUM_FETCHERS), + futureNotifier, + elementsQueue, + splitFetcherSupplier), + recordEmitter, + config, + context); + } + + @Override + protected void onSplitFinished(Collection<String> finishedSplitIds) { + // Do something in the callback for the finished splits. + } + + @Override + protected SplitStateT initializedState(SplitT split) { + ... + } + + @Override + protected SplitT toSplitType(String splitId, SplitStateT splitState) { + ... + } +} +{% endhighlight %} +</div> ----- ----- +Apparently, the `SourceReader` implementations can also implement their own threading model easily on top of the `SplitFetcherManager` and `SourceReaderBase`. ## Event Time and Watermarks diff --git a/docs/fig/source_reader.svg b/docs/fig/source_reader.svg new file mode 100644 index 0000000..1d4d9ac --- /dev/null +++ b/docs/fig/source_reader.svg @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<svg width="1257" height="653" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" overflow="hidden"><defs><clipPath id="clip0"><path d="M-11 41 1246 41 1246 694-11 694Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip1"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath><clipPath id="clip2"><path d="M1017 309 1130 309 1130 420 1017 420Z" fill-rule="evenodd" clip-rule="evenodd"/></clipPath>< [...] \ No newline at end of file