This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ef21d506bebee0ac6466d4abe2f844ef7c15cde5 Author: Qingsheng Ren <renqs...@gmail.com> AuthorDate: Tue Jun 1 08:37:57 2021 +0800 [FLINK-22722][docs/kafka] Add documentation for Kafka new source (#15974) (cherry picked from commit b582991b8b2b8dadb89e71d5002c4a9cc2055e34) --- docs/content/docs/connectors/datastream/kafka.md | 215 ++++++++++++++++++++++- 1 file changed, 212 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md index 192d185..1f21e7d 100644 --- a/docs/content/docs/connectors/datastream/kafka.md +++ b/docs/content/docs/connectors/datastream/kafka.md @@ -38,10 +38,219 @@ For details on Kafka compatibility, please refer to the official [Kafka document {{< artifact flink-connector-kafka withScalaVersion >}} +if you are using Kafka source, ```flink-connector-base``` is also required as dependency: + +{{< artifact flink-connector-base >}} + Flink's streaming connectors are not currently part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). -## Kafka Consumer +## Kafka Source +{{< hint info >}} +This part describes the Kafka source based on the new +[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API. +{{< /hint >}} + +### Usage +Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet +below shows how to build a KafkaSource to consume messages from the earliest offset of topic +"input-topic", with consumer group "my-group" and deserialize only the value of message as string. +```java +KafkaSource<String> source = KafkaSource.<String>builder() + .setBootstrapServers(brokers) + .setTopics("input-topic") + .setGroupId("my-group") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); + +env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); +``` +The following properties are **required** for building a KafkaSource: +- Bootstrap servers, configured by ```setBootstrapServers(String)``` +- Consumer group ID, configured by ```setGroupId(String)``` +- Topics / partitions to subscribe, see the following + <a href="#topic-partition-subscription">Topic-partition subscription</a> for more details. +- Deserializer to parse Kafka messages, see the following + <a href="#deserializer">Deserializer</a> for more details. + +### Topic-partition Subscription +Kafka source provide 3 ways of topic-partition subscription: +- Topic list, subscribing messages from all partitions in a list of topics. For example: + ```java + KafkaSource.builder().setTopics("topic-a", "topic-b") + ``` +- Topic pattern, subscribing messages from all topics whose name matches the provided regular + expression. For example: + ```java + KafkaSource.builder().setTopicPattern("topic.*") + ``` +- Partition set, subscribing partitions in the provided partition set. For example: + ```java + final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( + new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" + new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" + KafkaSource.builder().setPartitions(partitionSet) + ``` +### Deserializer +A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be +configured by ```setDeserializer(KakfaRecordDeserializationSchema)```, where +```KafkaRecordDeserializationSchema``` defines how to deserialize a Kafka ```ConsumerRecord```. + +If only the value of Kafka ```ConsumerRecord``` is needed, you can use +```setValueOnlyDeserializer(DeserializationSchema)``` in the builder, where +```DeserializationSchema``` defines how to deserialize binaries of Kafka message value. + +You can also use a <a href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html">```Kafka Deserializer```</a> +for deserializing Kafka message value. For example using ```StringDeserializer``` for deserializing +Kafka message value as string: +```java +import org.apache.kafka.common.serialization.StringDeserializer; + +KafkaSource.<String>builder() + .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)); +``` + +### Starting Offset +Kafka source is able to consume messages starting from different offsets by specifying +```OffsetsInitializer```. Built-in initializers include: + +```java +KafkaSource.builder() + // Start from committed offset of the consuming group, without reset strategy + .setStartingOffsets(OffsetsInitializer.committedOffsets()) + // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) + // Start from the first record whose timestamp is greater than or equals a timestamp + .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L)) + // Start from earliest offset + .setStartingOffsets(OffsetsInitializer.earliest()) + // Start from latest offset + .setStartingOffsets(OffsetsInitializer.latest()) +``` + +You can also implement a custom offsets initializer if built-in initializers above cannot fulfill +your requirement. + +If offsets initializer is not specified, **OffsetsInitializer.earliest()** will be +used by default. + +### Boundedness +Kafka source is designed to support both streaming and batch running mode. By default, the KafkaSource +is set to run in streaming manner, thus never stops until Flink job fails or is cancelled. You can use +```setBounded(OffsetsInitializer)``` to specify stopping offsets and set the source running in +batch mode. When all partitions have reached their stoping offsets, the source will exit. + +You can also set KafkaSource running in streaming mode, but still stop at the stopping offset by +using ```setUnbounded(OffsetsInitializer)```. The source will exit when all partitions reach their +specified stopping offset. + +### Additional Properties +In addition to properties described above, you can set arbitrary properties for KafkaSource and +KafkaConsumer by using ```setProperties(Properties)``` and ```setProperty(String, String)```. +KafkaSource has following options for configuration: +- ```client.id.prefix``` defines the prefix to use for Kafka consumer's client ID +- ```partition.discovery.interval.ms``` defines the interval im milliseconds for Kafka source + to discover new partitions. See <a href="#dynamic-partition-discovery">Dynamic Partition Discovery</a> + below for more details. + +For configurations of KafkaConsumer, you can refer to +<a href="http://kafka.apache.org/documentation/#consumerconfigs">Apache Kafka documentation</a> +for more details. + +Please note that the following keys will be overridden by the builder even if +it is configured: +- ```key.deserializer``` is always set to ```ByteArrayDeserializer``` +- ```value.deserializer``` is always set to ```ByteArrayDeserializer``` +- ```auto.offset.reset.strategy``` is overridden by ```OffsetsInitializer#getAutoOffsetResetStrategy()``` + for the starting offsets +- ```partition.discovery.interval.ms``` is overridden to -1 when + ```setBounded(OffsetsInitializer)``` has been invoked + +The code snippet below shows configuring KafkaConsumer to use "PLAIN" as SASL mechanism and provide +JAAS configuration: +```java +KafkaSource.builder() + .setProperty("sasl.mechanism", "PLAIN") + .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";") +``` + +### Dynamic Partition Discovery +In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink +job, Kafka source can be configured to periodically discover new partitions under provided +topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for +property ```partition.discovery.interval.ms```: +```java +KafkaSource.builder() + .setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds +``` +{{< hint warning >}} +Partition discovery is **disabled** by default. You need to explicitly set the partition discovery +interval to enable this feature. +{{< /hint >}} + +### Event Time and Watermarks +By default, the record will use the timestamp embedded in Kafka ```ConsumerRecord``` as the event +time. You can define your own ```WatermarkStrategy``` for extract event time from the record itself, +and emit watermark downstream: +```java +env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy") +``` +[This documentation]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes +details about how to define a ```WatermarkStrategy```. + +### Consumer Offset Committing +Kafka source commits the current consuming offset when checkpoints are **completed**, for +ensuring the consistency between Flink's checkpoint state and committed offsets on Kafka brokers. + +If checkpointing is not enabled, Kafka source relies on Kafka consumer's internal automatic periodic +offset committing logic, configured by ```enable.auto.commit``` and ```auto.commit.interval.ms``` in +the properties of Kafka consumer. + +Note that Kafka source does **NOT** rely on committed offsets for fault tolerance. Committing offset +is only for exposing the progress of consumer and consuming group for monitoring. + +### Behind the Scene +{{< hint info >}} +If you are interested in how Kafka source works under the design of new data source API, you may +want to read this part as a reference. For details about the new data source API, +[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) and +<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface">FLIP-27</a> +provide more descriptive discussions. +{{< /hint >}} + +Under the abstraction of the new data source API, Kafka source consists of the following components: +#### Source Split +A source split in Kafka source represents a partition of Kafka topic. A Kafka source split consists +of: +- ```TopicPartition``` the split representing +- Starting offset of the partition +- Stopping offset of the partition, only available when the source is running in bounded mode + +The state of Kafka source split also stores current consuming offset of the partition, and the state +will be converted to immutable split when Kafka source reader is snapshot, assigning current offset +to the starting offset of the immutable split. + +You can check class ```KafkaPartitionSplit``` and ```KafkaPartitionSplitState``` for more details. + +#### Split Enumerator +The split enumerator of Kafka is responsible for discovering new splits (partitions) under the +provided topic partition subscription pattern, and assigning splits to readers, uniformly +distributed across subtasks, in round-robin style. Note that the split enumerator of Kafka source +pushes splits eagerly to source readers, so it won't need to handle split requests from source reader. + +#### Source Reader +The source reader of Kafka source extends the provided ```SourceReaderBase```, and use +single-thread-multiplexed thread model, which read multiple assigned splits (partitions) with one +KafkaConsumer driven by one ```SplitReader```. Messages are deserialized right after they are +fetched from Kafka in ```SplitReader```. The state of split, or current progress of message +consuming is updated by ```KafkaRecordEmitter``` , which is also responsible for assigning event time +when the record is emitted downstream. + +## Kafka SourceFunction +{{< hint info >}} +This part describes Kafka source based on the legacy SourceFunction API. +{{< /hint >}} Flink's Kafka consumer - `FlinkKafkaConsumer` provides access to read from one or more Kafka topics. @@ -342,7 +551,7 @@ properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val myConsumer = - new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties); + new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties) myConsumer.assignTimestampsAndWatermarks( WatermarkStrategy. .forBoundedOutOfOrderness(Duration.ofSeconds(20))) @@ -406,7 +615,7 @@ stream.addSink(myProducer) {{< /tab >}} {{< /tabs >}} -## The `SerializationSchema` +### The `SerializationSchema` The Flink Kafka Producer needs to know how to turn Java/Scala objects into binary data. The `KafkaSerializationSchema` allows users to specify such a schema.