PatrickRen commented on a change in pull request #15974: URL: https://github.com/apache/flink/pull/15974#discussion_r639531666
########## File path: docs/content/docs/connectors/datastream/kafka.md ########## @@ -38,10 +38,215 @@ For details on Kafka compatibility, please refer to the official [Kafka document {{< artifact flink-connector-kafka withScalaVersion >}} +if you are using Kafka source, ```flink-connector-base``` is also required as dependency: + +{{< artifact flink-connector-base >}} + Flink's streaming connectors are not currently part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). -## Kafka Consumer +## Kafka Source +{{< hint info >}} +This part describes the Kafka source based on the new +[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API. +{{< /hint >}} + +### Usage +Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet +below shows how to build a KafkaSource to consume messages from the earliest offset of topic +"input-topic", with consumer group "my-group" and deserialize only the value of message as string. +```java +KafkaSource<String> source = KafkaSource.<String>builder() + .setBootstrapServers(brokers) + .setTopics("input-topic") + .setGroupId("my-group") + .setStartingOffsets(OffsetsInitializer.earliest()) + .setValueOnlyDeserializer(new SimpleStringSchema()) + .build(); + +env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); +``` +The following properties are **required** for building a KafkaSource: +- Bootstrap servers, configured by ```setBootstrapServers(String)``` +- Consumer group ID, configured by ```setGroupId(String)``` +- Topics / partitions to subscribe, see the following + <a href="#topic-partition-subscription">Topic-partition subscription</a> for more details. +- Deserializer to parse Kafka messages, see the following + <a href="#deserializer">Deserializer</a> for more details. + +### Topic-partition Subscription +Kafka source provide 3 ways of topic-partition subscription: +- Topic list, subscribing messages from all partitions in a list of topics. For example: + ```java + KafkaSource.builder().setTopics("topic-a", "topic-b") + ``` +- Topic pattern, subscribing messages from all topics whose name matches the provided regular + expression. For example: + ```java + KafkaSource.builder().setTopicPattern("topic.*") + ``` +- Partition set, subscribing partitions in the provided partition set. For example: + ```java + final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( + new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" + new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" + KafkaSource.builder().setPartitions(partitionSet) + ``` +### Deserializer +A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be +configured by ```setDeserializer(KakfaRecordDeserializationSchema)```, where +```KafkaRecordDeserializationSchema``` defines how to deserialize a Kafka ```ConsumerRecord```. + +If only the value of Kafka ```ConsumerRecord``` is needed, you can use +```setValueOnlyDeserializer(DeserializationSchema)``` in the builder, where +```DeserializationSchema``` defines how to deserialize binaries of Kafka message value. + +You can also use a <a href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html">```Kafka Deserializer```</a> +for deserializing Kafka message value. For example using ```StringDeserializer``` for deserializing +Kafka message value as string: +```java +import org.apache.kafka.common.serialization.StringDeserializer; + +KafkaSource.<String>builder() + .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class)); +``` + +### Starting Offset +Kafka source is able to consume messages starting from different offsets by specifying +```OffsetsInitializer```. Built-in initializers include: + +```java +KafkaSource.builder() + // Start from committed offset of the consuming group, without reset strategy + .setStartingOffsets(OffsetsInitializer.committedOffsets()) + // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) + // Start from the first record whose timestamp is greater than or equals a timestamp + .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L)) + // Start from earliest offset + .setStartingOffsets(OffsetsInitializer.earliest()) + // Start from latest offset + .setStartingOffsets(OffsetsInitializer.latest()) +``` + +You can also implement a custom offsets initializer if built-in initializers above cannot fulfill +your requirement. + +If offsets initializer is not specified, **OffsetsInitializer.earliest()** will be +used by default. + +### Boundedness +Kafka source is designed to support both streaming and batch running mode. By default, the KafkaSource +is set to run in streaming manner, thus never stops until Flink job fails or is cancelled. You can use +```setBounded(OffsetInitializer)``` to specify stopping offsets and set the source running in +batch mode. When all partitions have reached their stooping offsets, the source will exit. Review comment: Fixed in ce7e539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org