This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef21d506bebee0ac6466d4abe2f844ef7c15cde5
Author: Qingsheng Ren <renqs...@gmail.com>
AuthorDate: Tue Jun 1 08:37:57 2021 +0800

    [FLINK-22722][docs/kafka] Add documentation for Kafka new source (#15974)
    
    (cherry picked from commit b582991b8b2b8dadb89e71d5002c4a9cc2055e34)
---
 docs/content/docs/connectors/datastream/kafka.md | 215 ++++++++++++++++++++++-
 1 file changed, 212 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/kafka.md 
b/docs/content/docs/connectors/datastream/kafka.md
index 192d185..1f21e7d 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -38,10 +38,219 @@ For details on Kafka compatibility, please refer to the 
official [Kafka document
 
 {{< artifact flink-connector-kafka withScalaVersion >}}
 
+if you are using Kafka source, ```flink-connector-base``` is also required as 
dependency:
+
+{{< artifact flink-connector-base >}}
+
 Flink's streaming connectors are not currently part of the binary distribution.
 See how to link with them for cluster execution [here]({{< ref 
"docs/dev/datastream/project-configuration" >}}).
 
-## Kafka Consumer
+## Kafka Source
+{{< hint info >}}
+This part describes the Kafka source based on the new 
+[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+### Usage
+Kafka source provides a builder class for constructing instance of 
KafkaSource. The code snippet
+below shows how to build a KafkaSource to consume messages from the earliest 
offset of topic
+"input-topic", with consumer group "my-group" and deserialize only the value 
of message as string.
+```java
+KafkaSource<String> source = KafkaSource.<String>builder()
+    .setBootstrapServers(brokers)
+    .setTopics("input-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema())
+    .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
+```
+The following properties are **required** for building a KafkaSource:
+- Bootstrap servers, configured by ```setBootstrapServers(String)```
+- Consumer group ID, configured by ```setGroupId(String)```
+- Topics / partitions to subscribe, see the following
+  <a href="#topic-partition-subscription">Topic-partition subscription</a> for 
more details.
+- Deserializer to parse Kafka messages, see the following
+  <a href="#deserializer">Deserializer</a> for more details.
+
+### Topic-partition Subscription
+Kafka source provide 3 ways of topic-partition subscription:
+- Topic list, subscribing messages from all partitions in a list of topics. 
For example:
+  ```java
+  KafkaSource.builder().setTopics("topic-a", "topic-b")
+  ```
+- Topic pattern, subscribing messages from all topics whose name matches the 
provided regular
+  expression. For example:
+  ```java
+  KafkaSource.builder().setTopicPattern("topic.*")
+  ```
+- Partition set, subscribing partitions in the provided partition set. For 
example:
+  ```java
+  final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
+          new TopicPartition("topic-a", 0),    // Partition 0 of topic 
"topic-a"
+          new TopicPartition("topic-b", 5)));  // Partition 5 of topic 
"topic-b"
+  KafkaSource.builder().setPartitions(partitionSet)
+  ```
+### Deserializer
+A deserializer is required for parsing Kafka messages. Deserializer 
(Deserialization schema) can be
+configured by ```setDeserializer(KakfaRecordDeserializationSchema)```, where
+```KafkaRecordDeserializationSchema``` defines how to deserialize a Kafka 
```ConsumerRecord```.
+
+If only the value of Kafka ```ConsumerRecord``` is needed, you can use
+```setValueOnlyDeserializer(DeserializationSchema)``` in the builder, where
+```DeserializationSchema``` defines how to deserialize binaries of Kafka 
message value.
+
+You can also use a <a 
href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html";>```Kafka
 Deserializer```</a>
+for deserializing Kafka message value. For example using 
```StringDeserializer``` for deserializing
+Kafka message value as string:
+```java
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+KafkaSource.<String>builder()
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class));
+```
+
+### Starting Offset
+Kafka source is able to consume messages starting from different offsets by 
specifying
+```OffsetsInitializer```. Built-in initializers include:
+
+```java
+KafkaSource.builder()
+    // Start from committed offset of the consuming group, without reset 
strategy
+    .setStartingOffsets(OffsetsInitializer.committedOffsets())
+    // Start from committed offset, also use EARLIEST as reset strategy if 
committed offset doesn't exist
+    
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
+    // Start from the first record whose timestamp is greater than or equals a 
timestamp
+    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // Start from earliest offset
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    // Start from latest offset
+    .setStartingOffsets(OffsetsInitializer.latest())
+```
+
+You can also implement a custom offsets initializer if built-in initializers 
above cannot fulfill
+your requirement.
+
+If offsets initializer is not specified, **OffsetsInitializer.earliest()** 
will be
+used by default.
+
+### Boundedness
+Kafka source is designed to support both streaming and batch running mode. By 
default, the KafkaSource
+is set to run in streaming manner, thus never stops until Flink job fails or 
is cancelled. You can use
+```setBounded(OffsetsInitializer)``` to specify stopping offsets and set the 
source running in
+batch mode. When all partitions have reached their stoping offsets, the source 
will exit.
+
+You can also set KafkaSource running in streaming mode, but still stop at the 
stopping offset by
+using ```setUnbounded(OffsetsInitializer)```. The source will exit when all 
partitions reach their
+specified stopping offset.
+
+### Additional Properties
+In addition to properties described above, you can set arbitrary properties 
for KafkaSource and
+KafkaConsumer by using ```setProperties(Properties)``` and 
```setProperty(String, String)```.
+KafkaSource has following options for configuration:
+- ```client.id.prefix``` defines the prefix to use for Kafka consumer's client 
ID
+- ```partition.discovery.interval.ms``` defines the interval im milliseconds 
for Kafka source
+  to discover new partitions. See <a 
href="#dynamic-partition-discovery">Dynamic Partition Discovery</a>
+  below for more details.
+
+For configurations of KafkaConsumer, you can refer to
+<a href="http://kafka.apache.org/documentation/#consumerconfigs";>Apache Kafka 
documentation</a>
+for more details.
+
+Please note that the following keys will be overridden by the builder even if
+it is configured:
+- ```key.deserializer``` is always set to ```ByteArrayDeserializer```
+- ```value.deserializer``` is always set to ```ByteArrayDeserializer```
+- ```auto.offset.reset.strategy``` is overridden by 
```OffsetsInitializer#getAutoOffsetResetStrategy()```
+  for the starting offsets
+- ```partition.discovery.interval.ms``` is overridden to -1 when
+  ```setBounded(OffsetsInitializer)``` has been invoked
+
+The code snippet below shows configuring KafkaConsumer to use "PLAIN" as SASL 
mechanism and provide
+JAAS configuration:
+```java
+KafkaSource.builder()
+    .setProperty("sasl.mechanism", "PLAIN")
+    .setProperty("sasl.jaas.config", 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"username\" password=\"password\";")
+```
+
+### Dynamic Partition Discovery
+In order to handle scenarios like topic scaling-out or topic creation without 
restarting the Flink
+job, Kafka source can be configured to periodically discover new partitions 
under provided 
+topic-partition subscribing pattern. To enable partition discovery, set a 
non-negative value for 
+property ```partition.discovery.interval.ms```:
+```java
+KafkaSource.builder()
+    .setProperty("partition.discovery.interval.ms", "10000") // discover new 
partitions per 10 seconds
+```
+{{< hint warning >}}
+Partition discovery is **disabled** by default. You need to explicitly set the 
partition discovery
+interval to enable this feature.
+{{< /hint >}}
+
+### Event Time and Watermarks
+By default, the record will use the timestamp embedded in Kafka 
```ConsumerRecord``` as the event
+time. You can define your own ```WatermarkStrategy``` for extract event time 
from the record itself,
+and emit watermark downstream:
+```java
+env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With 
Custom Watermark Strategy")
+```
+[This documentation]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
+details about how to define a ```WatermarkStrategy```.
+
+### Consumer Offset Committing
+Kafka source commits the current consuming offset when checkpoints are 
**completed**, for 
+ensuring the consistency between Flink's checkpoint state and committed 
offsets on Kafka brokers. 
+
+If checkpointing is not enabled, Kafka source relies on Kafka consumer's 
internal automatic periodic 
+offset committing logic, configured by ```enable.auto.commit``` and 
```auto.commit.interval.ms``` in
+the properties of Kafka consumer.
+
+Note that Kafka source does **NOT** rely on committed offsets for fault 
tolerance. Committing offset
+is only for exposing the progress of consumer and consuming group for 
monitoring.
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how Kafka source works under the design of new data 
source API, you may
+want to read this part as a reference. For details about the new data source 
API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) 
and
+<a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";>FLIP-27</a>
+provide more descriptive discussions.
+{{< /hint >}}
+
+Under the abstraction of the new data source API, Kafka source consists of the 
following components:
+#### Source Split
+A source split in Kafka source represents a partition of Kafka topic. A Kafka 
source split consists
+of:
+- ```TopicPartition``` the split representing
+- Starting offset of the partition
+- Stopping offset of the partition, only available when the source is running 
in bounded mode
+
+The state of Kafka source split also stores current consuming offset of the 
partition, and the state
+will be converted to immutable split when Kafka source reader is snapshot, 
assigning current offset
+to the starting offset of the immutable split.
+
+You can check class ```KafkaPartitionSplit``` and 
```KafkaPartitionSplitState``` for more details.
+
+#### Split Enumerator
+The split enumerator of Kafka is responsible for discovering new splits 
(partitions) under the
+provided topic partition subscription pattern, and assigning splits to 
readers, uniformly
+distributed across subtasks, in round-robin style. Note that the split 
enumerator of Kafka source 
+pushes splits eagerly to source readers, so it won't need to handle split 
requests from source reader.
+
+#### Source Reader
+The source reader of Kafka source extends the provided ```SourceReaderBase```, 
and use
+single-thread-multiplexed thread model, which read multiple assigned splits 
(partitions) with one
+KafkaConsumer driven by one ```SplitReader```. Messages are deserialized right 
after they are
+fetched from Kafka in ```SplitReader```. The state of split, or current 
progress of message
+consuming is updated by ```KafkaRecordEmitter``` , which is also responsible 
for assigning event time
+when the record is emitted downstream.
+
+## Kafka SourceFunction
+{{< hint info >}}
+This part describes Kafka source based on the legacy SourceFunction API.
+{{< /hint >}}
 
 Flink's Kafka consumer - `FlinkKafkaConsumer` provides access to read from one 
or more Kafka topics.
 
@@ -342,7 +551,7 @@ properties.setProperty("bootstrap.servers", 
"localhost:9092")
 properties.setProperty("group.id", "test")
 
 val myConsumer =
-    new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
+    new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties)
 myConsumer.assignTimestampsAndWatermarks(
     WatermarkStrategy.
         .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
@@ -406,7 +615,7 @@ stream.addSink(myProducer)
 {{< /tab >}}
 {{< /tabs >}}
 
-## The `SerializationSchema`
+### The `SerializationSchema`
 
 The Flink Kafka Producer needs to know how to turn Java/Scala objects into 
binary data.
 The `KafkaSerializationSchema` allows users to specify such a schema.

Reply via email to