PatrickRen commented on a change in pull request #15974:
URL: https://github.com/apache/flink/pull/15974#discussion_r639531666



##########
File path: docs/content/docs/connectors/datastream/kafka.md
##########
@@ -38,10 +38,215 @@ For details on Kafka compatibility, please refer to the 
official [Kafka document
 
 {{< artifact flink-connector-kafka withScalaVersion >}}
 
+if you are using Kafka source, ```flink-connector-base``` is also required as 
dependency:
+
+{{< artifact flink-connector-base >}}
+
 Flink's streaming connectors are not currently part of the binary distribution.
 See how to link with them for cluster execution [here]({{< ref 
"docs/dev/datastream/project-configuration" >}}).
 
-## Kafka Consumer
+## Kafka Source
+{{< hint info >}}
+This part describes the Kafka source based on the new 
+[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
+{{< /hint >}}
+
+### Usage
+Kafka source provides a builder class for constructing instance of 
KafkaSource. The code snippet
+below shows how to build a KafkaSource to consume messages from the earliest 
offset of topic
+"input-topic", with consumer group "my-group" and deserialize only the value 
of message as string.
+```java
+KafkaSource<String> source = KafkaSource.<String>builder()
+    .setBootstrapServers(brokers)
+    .setTopics("input-topic")
+    .setGroupId("my-group")
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    .setValueOnlyDeserializer(new SimpleStringSchema())
+    .build();
+
+env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
+```
+The following properties are **required** for building a KafkaSource:
+- Bootstrap servers, configured by ```setBootstrapServers(String)```
+- Consumer group ID, configured by ```setGroupId(String)```
+- Topics / partitions to subscribe, see the following
+  <a href="#topic-partition-subscription">Topic-partition subscription</a> for 
more details.
+- Deserializer to parse Kafka messages, see the following
+  <a href="#deserializer">Deserializer</a> for more details.
+
+### Topic-partition Subscription
+Kafka source provide 3 ways of topic-partition subscription:
+- Topic list, subscribing messages from all partitions in a list of topics. 
For example:
+  ```java
+  KafkaSource.builder().setTopics("topic-a", "topic-b")
+  ```
+- Topic pattern, subscribing messages from all topics whose name matches the 
provided regular
+  expression. For example:
+  ```java
+  KafkaSource.builder().setTopicPattern("topic.*")
+  ```
+- Partition set, subscribing partitions in the provided partition set. For 
example:
+  ```java
+  final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
+          new TopicPartition("topic-a", 0),    // Partition 0 of topic 
"topic-a"
+          new TopicPartition("topic-b", 5)));  // Partition 5 of topic 
"topic-b"
+  KafkaSource.builder().setPartitions(partitionSet)
+  ```
+### Deserializer
+A deserializer is required for parsing Kafka messages. Deserializer 
(Deserialization schema) can be
+configured by ```setDeserializer(KakfaRecordDeserializationSchema)```, where
+```KafkaRecordDeserializationSchema``` defines how to deserialize a Kafka 
```ConsumerRecord```.
+
+If only the value of Kafka ```ConsumerRecord``` is needed, you can use
+```setValueOnlyDeserializer(DeserializationSchema)``` in the builder, where
+```DeserializationSchema``` defines how to deserialize binaries of Kafka 
message value.
+
+You can also use a <a 
href="https://kafka.apache.org/24/javadoc/org/apache/kafka/common/serialization/Deserializer.html";>```Kafka
 Deserializer```</a>
+for deserializing Kafka message value. For example using 
```StringDeserializer``` for deserializing
+Kafka message value as string:
+```java
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+KafkaSource.<String>builder()
+        
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class));
+```
+
+### Starting Offset
+Kafka source is able to consume messages starting from different offsets by 
specifying
+```OffsetsInitializer```. Built-in initializers include:
+
+```java
+KafkaSource.builder()
+    // Start from committed offset of the consuming group, without reset 
strategy
+    .setStartingOffsets(OffsetsInitializer.committedOffsets())
+    // Start from committed offset, also use EARLIEST as reset strategy if 
committed offset doesn't exist
+    
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
+    // Start from the first record whose timestamp is greater than or equals a 
timestamp
+    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
+    // Start from earliest offset
+    .setStartingOffsets(OffsetsInitializer.earliest())
+    // Start from latest offset
+    .setStartingOffsets(OffsetsInitializer.latest())
+```
+
+You can also implement a custom offsets initializer if built-in initializers 
above cannot fulfill
+your requirement.
+
+If offsets initializer is not specified, **OffsetsInitializer.earliest()** 
will be
+used by default.
+
+### Boundedness
+Kafka source is designed to support both streaming and batch running mode. By 
default, the KafkaSource
+is set to run in streaming manner, thus never stops until Flink job fails or 
is cancelled. You can use
+```setBounded(OffsetInitializer)``` to specify stopping offsets and set the 
source running in
+batch mode. When all partitions have reached their stooping offsets, the 
source will exit.

Review comment:
       Fixed in ce7e539




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to