This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f8a6d31c4e4e180421517f7e700c731e58be37d8 Author: Thomas Weise <[email protected]> AuthorDate: Sun Aug 22 13:56:13 2021 -0700 [FLINK-22791][docs] Documentation for HybridSource --- .../docs/connectors/datastream/hybridsource.md | 100 +++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md new file mode 100644 index 0000000..02f5077 --- /dev/null +++ b/docs/content/docs/connectors/datastream/hybridsource.md @@ -0,0 +1,100 @@ +--- +title: Hybrid Source +weight: 8 +type: docs +aliases: +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Hybrid Source + +`HybridSource` is a source that contains a list of concrete [sources]({{< ref "docs/dev/datastream/sources" >}}). +It solves the problem of sequentially reading input from heterogeneous sources to produce a single input stream. + +For example, a bootstrap use case may need to read several days worth of bounded input from S3 before continuing with the latest unbounded input from Kafka. +`HybridSource` switches from `FileSource` to `KafkaSource` when the bounded file input finishes without interrupting the application. + +Prior to `HybridSource`, it was necessary to create a topology with multiple sources and define a switching mechanism in user land, which leads to operational complexity and inefficiency. + +With `HybridSource` the multiple sources appear as a single source in the Flink job graph and from `DataStream` API perspective. + +For more background see [FLIP-150](https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source) + +To use the connector, add the ```flink-connector-base``` dependency to your project: + +{{< artifact flink-connector-base >}} + +(Typically comes as transitive dependency with concrete sources.) + +## Start position for next source + +To arrange multiple sources in a `HybridSource`, all sources except the last one need to be bounded. Therefore, the sources typically need to be assigned a start and end position. The last source may be bounded in which case the `HybridSource` is bounded and unbounded otherwise. +Details depend on the specific source and the external storage systems. + +Here we cover the most basic and then a more complex scenario, following the File/Kafka example. + +#### Fixed start position at graph construction time + +Example: Read till pre-determined switch time from files and then continue reading from Kafka. +Each source covers an upfront known range and therefore the contained sources can be created upfront as if they were used directly: + +```java +long switchTimestamp = ...; // derive from file input paths +FileSource<String> fileSource = + FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); +KafkaSource<String> kafkaSource = + KafkaSource.<String>builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); +HybridSource<String> hybridSource = + HybridSource.builder(fileSource) + .addSource(kafkaSource) + .build(); +``` + +#### Dynamic start position at switch time + +Example: File source reads a very large backlog, taking potentially longer than retention available for next source. +Switch needs to occur at "current time - X". This requires the start time for the next source to be set at switch time. +Here we require transfer of end position from the previous file enumerator for deferred construction of `KafkaSource` +by implementing `SourceFactory`. + +Note that enumerators need to support getting the end timestamp. This may currently require a source customization. +Adding support for dynamic end position to `FileSource` is tracked in [FLINK-23633](https://issues.apache.org/jira/browse/FLINK-23633). + +```java +FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest(); +HybridSource<String> hybridSource = + HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource) + .addSource( + switchContext -> { + CustomFileSplitEnumerator previousEnumerator = + switchContext.getPreviousEnumerator(); + // how to get timestamp depends on specific enumerator + long switchTimestamp = previousEnumerator.getEndTimestamp(); + KafkaSource<String> kafkaSource = + KafkaSource.<String>builder() + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .build(); + return kafkaSource; + }, + Boundedness.CONTINUOUS_UNBOUNDED) + .build(); +```
