This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d87f68b534 [Improve][Docs][Kafka]Reconstruct the kafka connector
document (#4778)
d87f68b534 is described below
commit d87f68b534cbc8e56f6970c8d962bf6ebf5e8641
Author: monster <[email protected]>
AuthorDate: Wed Aug 2 10:06:18 2023 +0800
[Improve][Docs][Kafka]Reconstruct the kafka connector document (#4778)
* [Docs][Connector-V2][Kafka]Reconstruct the kafka connector document
---------
Co-authored-by: chenzy15 <[email protected]>
---
docs/en/connector-v2/sink/Kafka.md | 151 +++++++++++++-----------------
docs/en/connector-v2/source/kafka.md | 172 +++++++++++------------------------
2 files changed, 115 insertions(+), 208 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index f971e5390b..1e258a058a 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -1,36 +1,52 @@
# Kafka
> Kafka sink connector
->
- ## Description
-Write Rows to a Kafka topic.
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
-## Key features
+## Key Features
- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+> By default, we will use 2pc to guarantee the message is sent to kafka
exactly once.
+
+## Description
+
+Write Rows to a Kafka topic.
-By default, we will use 2pc to guarantee the message is sent to kafka exactly
once.
+## Supported DataSource Info
-## Options
+In order to use the Kafka connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central
repository.
-| name | type | required | default value |
-|----------------------|--------|----------|---------------|
-| topic | string | yes | - |
-| bootstrap.servers | string | yes | - |
-| kafka.config | map | no | - |
-| semantics | string | no | NON |
-| partition_key_fields | array | no | - |
-| partition | int | no | - |
-| assign_partitions | array | no | - |
-| transaction_prefix | string | no | - |
-| format | String | no | json |
-| field_delimiter | String | no | , |
-| common-options | config | no | - |
+| Datasource | Supported Versions |
Maven |
+|------------|--------------------|-------------------------------------------------------------------------------------------------------------|
+| Kafka | Universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka)
|
-### topic [string]
+## Sink Options
-Kafka Topic.
+| Name | Type | Required | Default |
Description
|
+|----------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String | Yes | - | When the table is used
as sink, the topic name is the topic to write data to.
|
+| bootstrap.servers | String | Yes | - | Comma separated list of
Kafka brokers.
|
+| kafka.config | Map | No | - | In addition to the
above parameters that must be specified by the `Kafka producer` client, the
user can also specify multiple non-mandatory parameters for the `producer`
client, covering [all the producer parameters specified in the official Kafka
document](https://kafka.apache.org/documentation.html#producerconfigs).
|
+| semantics | String | No | NON | Semantics that can be
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
|
+| partition_key_fields | Array | No | - | Configure which fields
are used as the key of the kafka message.
|
+| partition | Int | No | - | We can specify the
partition, all messages will be sent to this partition.
|
+| assign_partitions | Array | No | - | We can decide which
partition to send based on the content of the message. The function of this
parameter is to distribute information.
|
+| transaction_prefix | String | No | - | If semantic is
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka
transaction,kafka distinguishes different transactions by different
transactionId. This parameter is prefix of kafka transactionId, make sure
different job use different prefix.
|
+| format | String | No | json | Data format. The
default format is json. Optional text format, canal-json and debezium-json.If
you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.If you use canal
format, please refer to [canal-json](../formats/canal-json.md) for details.If
you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details. |
+| field_delimiter | String | No | , | Customize the field
delimiter for data format.
|
+| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common Options](common-options.md) for
details
|
+
+## Parameter Interpretation
+
+### Topic Formats
Currently two formats are supported:
@@ -47,27 +63,13 @@ Currently two formats are supported:
If `${name}` is set as the topic. So the first row is sent to Jack topic,
and the second row is sent to Mary topic.
-### bootstrap.servers [string]
-
-Kafka Brokers List.
-
-### kafka.config [kafka producer config]
-
-In addition to the above parameters that must be specified by the `Kafka
producer` client, the user can also specify multiple non-mandatory parameters
for the `producer` client, covering [all the producer parameters specified in
the official Kafka
document](https://kafka.apache.org/documentation.html#producerconfigs).
-
-### semantics [string]
-
-Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+### Semantics
In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that
will be committed to Kafka on a checkpoint.
-
In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka
buffers to be acknowledged by the Kafka producer on a checkpoint.
-
NON does not provide any guarantees: messages may be lost in case of issues on
the Kafka broker and messages may be duplicated.
-### partition_key_fields [array]
-
-Configure which fields are used as the key of the kafka message.
+### Partition Key Fields
For example, if you want to use value of fields from upstream data as key, you
can assign field names to this property.
@@ -79,55 +81,48 @@ Upstream data is the following:
| Mary | 23 | data-example2 |
If name is set as the key, then the hash value of the name column will
determine which partition the message is sent to.
-
If not set partition key fields, the null message key will be sent to.
-
The format of the message key is json, If name is set as the key, for example
'{"name":"Jack"}'.
-
The selected field must be an existing field in the upstream.
-### partition [int]
-
-We can specify the partition, all messages will be sent to this partition.
-
-### assign_partitions [array]
-
-We can decide which partition to send based on the content of the message. The
function of this parameter is to distribute information.
+### Assign Partitions
For example, there are five partitions in total, and the assign_partitions
field in config is as follows:
assign_partitions = ["shoe", "clothing"]
-
Then the message containing "shoe" will be sent to partition zero ,because
"shoe" is subscribed as zero in assign_partitions, and the message containing
"clothing" will be sent to partition one.For other messages, the hash algorithm
will be used to divide them into the remaining partitions.
-
This function by `MessageContentPartitioner` class implements
`org.apache.kafka.clients.producer.Partitioner` interface.If we need custom
partitions, we need to implement this interface as well.
-### transaction_prefix [string]
-
-If semantic is specified as EXACTLY_ONCE, the producer will write all messages
in a Kafka transaction.
-Kafka distinguishes different transactions by different transactionId. This
parameter is prefix of kafka transactionId, make sure different job use
different prefix.
-
-### format
+## Task Example
-Data format. The default format is json. Optional text format, canal-json and
debezium-json.
-If you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.
-If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.
-If you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details.
+### Simple:
-### field_delimiter
+> This example defines a SeaTunnel synchronization task that automatically
generates data through FakeSource and sends it to Kafka Sink. FakeSource
generates a total of 16 rows of data (row.num=16), with each row having two
fields, name (string type) and age (int type). The final target topic is
test_topic will also be 16 rows of data in the topic. And if you have not yet
installed and deployed SeaTunnel, you need to follow the instructions in
[Install SeaTunnel](../../start-v2/locally/d [...]
-Customize the field delimiter for data format.
-
-### common options [config]
-
-Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
+```hocon
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
-## Examples
+source {
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
-```hocon
sink {
-
kafka {
- topic = "seatunnel"
+ topic = "test_topic"
bootstrap.servers = "localhost:9092"
partition = 3
format = json
@@ -139,7 +134,6 @@ sink {
buffer.memory = 33554432
}
}
-
}
```
@@ -162,7 +156,6 @@ sink {
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required \nusername=${username}\npassword=${password};"
}
}
-
}
```
@@ -199,22 +192,6 @@ sink {
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
-
}
```
-## Changelog
-
-### 2.3.0-beta 2022-10-20
-
-- Add Kafka Sink Connector
-
-### next version
-
-- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
-- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
-- [Improve] Support extract topic from SeaTunnelRow fields
[3742](https://github.com/apache/incubator-seatunnel/pull/3742)
-- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
-- [Improve] Support read canal format message
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
-- [Improve] Support read debezium format message
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
-
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 2ed6ec6f12..16b9c5420b 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -2,11 +2,13 @@
> Kafka source connector
-## Description
+## Support Those Engines
-Source connector for Apache Kafka.
+> Spark<br/>
+> Flink<br/>
+> Seatunnel Zeta<br/>
-## Key features
+## Key Features
- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
@@ -15,111 +17,54 @@ Source connector for Apache Kafka.
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-## Options
-
-| name | type | required | default
value |
-|-------------------------------------|---------|----------|--------------------------|
-| topic | String | yes | -
|
-| bootstrap.servers | String | yes | -
|
-| pattern | Boolean | no | false
|
-| consumer.group | String | no |
SeaTunnel-Consumer-Group |
-| commit_on_checkpoint | Boolean | no | true
|
-| kafka.config | Map | no | -
|
-| common-options | config | no | -
|
-| schema | | no | -
|
-| format | String | no | json
|
-| format_error_handle_way | String | no | fail
|
-| field_delimiter | String | no | ,
|
-| start_mode | String | no | group_offsets
|
-| start_mode.offsets | | no |
|
-| start_mode.timestamp | Long | no |
|
-| partition-discovery.interval-millis | long | no | -1
|
-
-### topic [string]
-
-`Kafka topic` name. If there are multiple `topics`, use `,` to split, for
example: `"tpc1,tpc2"`.
-
-### bootstrap.servers [string]
-
-`Kafka` cluster address, separated by `","`.
-
-### pattern [boolean]
-
-If `pattern` is set to `true`,the regular expression for a pattern of topic
names to read from. All topics in clients with names that match the specified
regular expression will be subscribed by the consumer.
-
-### consumer.group [string]
-
-`Kafka consumer group id`, used to distinguish different consumer groups.
-
-### commit_on_checkpoint [boolean]
-
-If true the consumer's offset will be periodically committed in the background.
-
-## partition-discovery.interval-millis [long]
-
-The interval for dynamically discovering topics and partitions.
-
-### kafka.config [map]
-
-In addition to the above necessary parameters that must be specified by the
`Kafka consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
-
-### common-options [config]
-
-Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
-
-### schema
-
-The structure of the data, including field names and field types.
-
-## format
-
-Data format. The default format is json. Optional text format, canal-json and
debezium-json.
-If you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.
-If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.
-If you use debezium format, please refer to
[debezium-json](../formats/debezium-json.md) for details.
-
-## format_error_handle_way
-
-The processing method of data format error. The default value is fail, and the
optional value is (fail, skip).
-When fail is selected, data format error will block and an exception will be
thrown.
-When skip is selected, data format error will skip this line data.
-
-## field_delimiter
-
-Customize the field delimiter for data format.
-
-## start_mode
-
-The initial consumption pattern of consumers,there are several types:
-[earliest],[group_offsets],[latest],[specific_offsets],[timestamp]
-
-## start_mode.timestamp
-
-The time required for consumption mode to be "timestamp".
-
-## start_mode.offsets
-
-The offset required for consumption mode to be specific_offsets.
-
-for example:
+## Description
-```hocon
-start_mode.offsets = {
- info-0 = 70
- info-1 = 10
- info-2 = 10
- }
-```
+Source connector for Apache Kafka.
-## Example
+## Supported DataSource Info
+
+In order to use the Kafka connector, the following dependencies are required.
+They can be downloaded via install-plugin.sh or from the Maven central
repository.
+
+| Datasource | Supported Versions |
Maven |
+|------------|--------------------|-------------------------------------------------------------------------------------------------------------|
+| Kafka | Universal |
[Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-kafka)
|
+
+## Source Options
+
+| Name |
Type | Required | Default
|
Description
[...]
+|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
[...]
+| bootstrap.servers | String
| Yes | - |
Comma separated list of Kafka brokers.
[...]
+| pattern | Boolean
| No | false | If
`pattern` is set to `true`,the regular expression for a pattern of topic names
to read from. All topics in clients with names that match the specified regular
expression will be subscribed by the consumer.
[...]
+| consumer.group | String
| No | SeaTunnel-Consumer-Group |
`Kafka consumer group id`, used to distinguish different consumer groups.
[...]
+| commit_on_checkpoint | Boolean
| No | true | If
true the consumer's offset will be periodically committed in the background.
[...]
+| kafka.config | Map
| No | - | In
addition to the above necessary parameters that must be specified by the `Kafka
consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
[...]
+| schema | Config
| No | - |
The structure of the data, including field names and field types.
[...]
+| format | String
| No | json |
Data format. The default format is json. Optional text format, canal-json and
debezium-json.If you use json or text format. The default field separator is ",
". If you customize the delimiter, add the "field_delimiter" option.If you use
canal format, please refer to [canal-json](../formats/canal-json.md) for
details.If you use debeziu [...]
+| format_error_handle_way | String
| No | fail |
The processing method of data format error. The default value is fail, and the
optional value is (fail, skip). When fail is selected, data format error will
block and an exception will be thrown. When skip is selected, data format error
will skip this line data.
[...]
+| field_delimiter | String
| No | , |
Customize the field delimiter for data format.
[...]
+| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
[...]
+| start_mode.offsets | Config
| No | - |
The offset required for consumption mode to be specific_offsets.
[...]
+| start_mode.timestamp | Long
| No | - |
The time required for consumption mode to be "timestamp".
[...]
+| partition-discovery.interval-millis | Long
| No | -1 |
The interval for dynamically discovering topics and partitions.
[...]
+| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
[...]
+
+## Task Example
### Simple
+> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints
it to the client.And if you have not yet installed and deployed SeaTunnel, you
need to follow the instructions in Install SeaTunnel to install and deploy
SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need
to follow the instructions in [Install
SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy
SeaTunnel. And then follow the instructions in [Quick Start With SeaTunn [...]
+
```hocon
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
source {
-
Kafka {
- result_table_name = "kafka_name"
schema = {
fields {
name = "string"
@@ -136,8 +81,10 @@ source {
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
- }
-
+ }
+}
+sink {
+ Console {}
}
```
@@ -145,14 +92,12 @@ source {
```hocon
source {
-
Kafka {
topic = ".*seatunnel*."
pattern = "true"
bootstrap.servers = "localhost:9092"
consumer.group = "seatunnel_group"
}
-
}
```
@@ -169,7 +114,7 @@ source {
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
-
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required \nusername=${username}\npassword=${password};"
+
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required username=\"username\" password=\"password\";"
#security.protocol=SASL_SSL
#sasl.mechanism=AWS_MSK_IAM
#sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule
required;"
@@ -205,7 +150,7 @@ source {
kafka.config = {
#security.protocol=SASL_SSL
#sasl.mechanism=SCRAM-SHA-512
-
#sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required \nusername=${username}\npassword=${password};"
+
#sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule
required username=\"username\" password=\"password\";"
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule
required;"
@@ -215,18 +160,3 @@ source {
}
```
-## Changelog
-
-### 2.3.0-beta 2022-10-20
-
-- Add Kafka Source Connector
-
-### Next Version
-
-- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
-- [Improve] Support for dynamic discover topic & partition in streaming mode
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
-- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
-- [Bug] Fixed the problem that parsing the offset format failed when the
startup mode was
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
-- [Improve] Support read canal format message
[3950](https://github.com/apache/incubator-seatunnel/pull/3950)
-- [Improve] Support read debezium format message
[3981](https://github.com/apache/incubator-seatunnel/pull/3981)
-