PatrickRen commented on a change in pull request #15974: URL: https://github.com/apache/flink/pull/15974#discussion_r639532217
########## File path: docs/content/docs/connectors/datastream/kafka.md ########## @@ -38,10 +38,215 @@ For details on Kafka compatibility, please refer to the official [Kafka document {{< artifact flink-connector-kafka withScalaVersion >}} +if you are using Kafka source, ```flink-connector-base``` is also required as dependency: + +{{< artifact flink-connector-base >}} + Flink's streaming connectors are not currently part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). -## Kafka Consumer +## Kafka Source +{{< hint info >}} +This part describes the Kafka source based on the new +[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API. +{{< /hint >}} + +### Usage +Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet +below shows how to build a KafkaSource to consume messages from the earliest offset of topic +"input-topic", with consumer group "my-group" and deserialize only the value of message as string. +```java +KafkaSource<String> source = KafkaSource.<String>builder() + .setBootstrapServers(brokers) + .setTopics("input-topic") + .setGroupId("my-group") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); + +env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); +``` +The following properties are **required** for building a KafkaSource: +- Bootstrap servers, configured by ```setBootstrapServers(String)``` +- Consumer group ID, configured by ```setGroupId(String)``` +- Topics / partitions to subscribe, see the following + <a href="#topic-partition-subscription">Topic-partition subscription</a> for more details. +- Deserializer to parse Kafka messages, see the following + <a href="#deserializer">Deserializer</a> for more details. + +### Topic-partition Subscription +Kafka source provide 3 ways of topic-partition subscription: +- Topic list, subscribing messages from all partitions in a list of topics. For example: + ```java + KafkaSource.builder().setTopics("topic-a", "topic-b") + ``` +- Topic pattern, subscribing messages from all topics whose name matches the provided regular + expression. For example: + ```java + KafkaSource.builder().setTopicPattern("topic.*") + ``` +- Partition set, subscribing partitions in the provided partition set. For example: + ```java + final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( + new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" + new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" + KafkaSource.builder().setPartitions(partitionSet) + ``` +### Deserializer +A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be +configured by ```setDeserializer(KakfaRecordDeserializationSchema)```, where +```KafkaRecordDeserializationSchema``` defines how to deserialize a Kafka ```ConsumerRecord```. + +If only the value of Kafka ```ConsumerRecord``` is needed, you can use +```setValueOnlyDeserializer(DeserializationSchema)``` in the builder, where +```DeserializationSchema``` defines how to deserialize binaries of Kafka message value. + +You can also use a <a href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html">```Kafka Deserializer```</a> +for deserializing Kafka message value. For example using ```StringDeserializer``` for deserializing +Kafka message value as string: +```java +import org.apache.kafka.common.serialization.StringDeserializer; + +KafkaSource.<String>builder() + .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)); +``` + +### Starting Offset +Kafka source is able to consume messages starting from different offsets by specifying +```OffsetsInitializer```. Built-in initializers include: + +```java +KafkaSource.builder() + // Start from committed offset of the consuming group, without reset strategy + .setStartingOffsets(OffsetsInitializer.committedOffsets()) + // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) + // Start from the first record whose timestamp is greater than or equals a timestamp + .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L)) + // Start from earliest offset + .setStartingOffsets(OffsetsInitializer.earliest()) + // Start from latest offset + .setStartingOffsets(OffsetsInitializer.latest()) +``` + +You can also implement a custom offsets initializer if built-in initializers above cannot fulfill +your requirement. + +If offsets initializer is not specified, **OffsetsInitializer.earliest()** will be +used by default. + +### Boundedness +Kafka source is designed to support both streaming and batch running mode. By default, the KafkaSource +is set to run in streaming manner, thus never stops until Flink job fails or is cancelled. You can use +```setBounded(OffsetInitializer)``` to specify stopping offsets and set the source running in +batch mode. When all partitions have reached their stooping offsets, the source will exit. + +You can also set KafkaSource running in streaming mode, but still stop at the stopping offset by +using ```setUnbounded(OffsetInitializer)```. The source will exit when all partitions reach their +specified stopping offset. + +### Additional Properties +In addition to properties described above, you can set arbitrary properties for KafkaSource and +KafkaConsumer by using ```setProperties(Properties)``` and ```setProperty(String, String)```. +KafkaSource has following options for configuration: +- ```client.id.prefix``` defines the prefix to use for Kafka consumer's client ID +- ```partition.discovery.interval.ms``` defines the interval im milliseconds for Kafka source + to discover new partitions. See <a href="#dynamic-partition-discovery">Dynamic Partition Discovery</a> + below for more details. + +For configurations of KafkaConsumer, you can refer to +<a href="http://kafka.apache.org/documentation/#consumerconfigs">Apache Kafka documentation</a> +for more details. + +Please note that the following keys will be overridden by the builder even if +it is configured: +- ```key.deserializer``` is always set to ```ByteArrayDeserializer``` +- ```value.deserializer``` is always set to ```ByteArrayDeserializer``` +- ```auto.offset.reset.strategy``` is overridden by ```OffsetsInitializer#getAutoOffsetResetStrategy()``` + for the starting offsets +- ```partition.discovery.interval.ms``` is overridden to -1 when + ```setBounded(OffsetsInitializer)``` has been invoked + +The code snippet below shows configuring KafkaConsumer to use "PLAIN" as SASL mechanism and provide +JAAS configuration: +```java +KafkaSource.builder() + .setProperty("sasl.mechanism", "PLAIN") + .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";") +``` + +### Dynamic Partition Discovery +In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink +job, Kafka source can be configured to periodically discover new partitions under provided +topic-partition subscribing pattern. The discovery is disabled by default. To enable this feature, Review comment: Added a warning box in ce7e539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org