This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 908003e2f0 [Improve][Docs] Fix Kafka connector documentation errors
(#11006)
908003e2f0 is described below
commit 908003e2f02b87fbee319d909035ed65edcfaecf
Author: cosmosni <[email protected]>
AuthorDate: Fri Jun 5 13:29:07 2026 +0800
[Improve][Docs] Fix Kafka connector documentation errors (#11006)
Co-authored-by: xuepeng <[email protected]>
---
docs/en/connectors/changelog/connector-kafka.md | 2 +-
docs/en/connectors/sink/Kafka.md | 9 ++-------
docs/en/connectors/source/Kafka.md | 2 +-
docs/zh/connectors/changelog/connector-kafka.md | 2 +-
docs/zh/connectors/sink/Kafka.md | 11 +++--------
docs/zh/connectors/source/Kafka.md | 8 ++++----
6 files changed, 12 insertions(+), 22 deletions(-)
diff --git a/docs/en/connectors/changelog/connector-kafka.md
b/docs/en/connectors/changelog/connector-kafka.md
index 85e8d10c6e..d99303db9b 100644
--- a/docs/en/connectors/changelog/connector-kafka.md
+++ b/docs/en/connectors/changelog/connector-kafka.md
@@ -5,7 +5,7 @@
|[Fix][Connector-V2] Optimize start mode of kafka recovery job
(#9736)|https://github.com/apache/seatunnel/commit/bbde7f6339|2.3.12|
|[Improve][API] Optimize the enumerator API semantics and reduce lock calls at
the connector level
(#9671)|https://github.com/apache/seatunnel/commit/9212a77140|2.3.12|
|[Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in
KafkaConsumer StreamMode
(#9598)|https://github.com/apache/seatunnel/commit/bd24fa77cb|2.3.12|
-|[Fix][Connecotr-kafka] Fix kafka IllegalArgumentException when offset is -1
(#9376)|https://github.com/apache/seatunnel/commit/142aca7b70|2.3.12|
+|[Fix][Connector-kafka] Fix kafka IllegalArgumentException when offset is -1
(#9376)|https://github.com/apache/seatunnel/commit/142aca7b70|2.3.12|
|[Feature][Connectors-V2] Add end_timestamp for timstamp start mode
(#9318)|https://github.com/apache/seatunnel/commit/68b0504da9|2.3.11|
|[Bugifx][kafka] Fix kafka enumerator assign split NPE
(#9220)|https://github.com/apache/seatunnel/commit/7ca0c0c7e4|2.3.11|
| [Fix][Connector-V2] Fix kafka database name
(#9201)|https://github.com/apache/seatunnel/commit/79d9a937ee|2.3.11|
diff --git a/docs/en/connectors/sink/Kafka.md b/docs/en/connectors/sink/Kafka.md
index ce3b8bebc4..42e5cda945 100644
--- a/docs/en/connectors/sink/Kafka.md
+++ b/docs/en/connectors/sink/Kafka.md
@@ -43,9 +43,9 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| partition | Int | No | - | We can specify the
partition, all messages will be sent to this partition.
[...]
| assign_partitions | Array | No | - | We can decide which
partition to send based on the content of the message. The function of this
parameter is to distribute information.
[...]
| transaction_prefix | String | No | - | If semantic is
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka
transaction,kafka distinguishes different transactions by different
transactionId. This parameter is prefix of kafka transactionId, make sure
different job use different prefix.
[...]
-| format | String | No | json | Data format. The
default format is json. Optional text format, canal_json, debezium_json,
ogg_json , avro and native.If you use json or text format. The default field
separator is ", ". If you customize the delimiter, add the "field_delimiter"
option.If you use canal format, please refer to
[canal-json](../formats/canal-json.md) for details.If you use debezium format,
please refer to [debezium-json](../formats/debezium-json.md) for [...]
+| format | String | No | json | Data format. The
default format is json. Optional text, canal_json, debezium_json,
compatible_debezium_json, ogg_json, maxwell_json, avro, protobuf and native. If
you use json or text format. The default field separator is ", ". If you
customize the delimiter, add the "field_delimiter" option.If you use canal
format, please refer to [canal-json](../formats/canal-json.md) for details.If
you use debezium format, please refer to [debez [...]
| field_delimiter | String | No | , | Customize the field
delimiter for data format.
[...]
-| common-options | | No | - | Source plugin common
parameters, please refer to [Source Common
Options](../common-options/sink-common-options.md) for details
[...]
+| common-options | | No | - | Sink plugin common
parameters, please refer to [Sink Common
Options](../common-options/sink-common-options.md) for details
[...]
| protobuf_message_name | String | No | - | Effective when the
format is set to protobuf, specifies the Message name
[...]
| protobuf_schema | String | No | - | Effective when the
format is set to protobuf, specifies the Schema definition
[...]
@@ -149,7 +149,6 @@ sink {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
@@ -193,7 +192,6 @@ sink {
format = json
partition_key_fields = ["name"]
kafka_headers_fields = ["source", "traceId"]
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
@@ -214,7 +212,6 @@ sink {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
@@ -248,7 +245,6 @@ sink {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
@@ -296,7 +292,6 @@ sink {
topic = "test_protobuf_topic_fake_source"
bootstrap.servers = "kafkaCluster:9092"
format = protobuf
- kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
diff --git a/docs/en/connectors/source/Kafka.md
b/docs/en/connectors/source/Kafka.md
index c66e197cce..35ec94cbec 100644
--- a/docs/en/connectors/source/Kafka.md
+++ b/docs/en/connectors/source/Kafka.md
@@ -52,7 +52,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
[...]
| start_mode.offsets | Config
| No | - | The
offset required for consumption mode to be specific_offsets.
[...]
| start_mode.timestamp | Long
| No | - | The
time required for consumption mode to be "timestamp".
[...]
-| start_mode.end_timestamp | Long
| No | - |
The end time required for consumption mode to be "timestamp" in batch mode
+| start_mode.end_timestamp | Long
| No | - |
The end time required for consumption mode to be "timestamp" in batch mode |
| partition-discovery.interval-millis | Long
| No | -1 | The
interval for dynamically discovering topics and partitions.
[...]
| ignore_no_leader_partition | Boolean
| No | false |
Whether to ignore partitions that have no leader. If set to true, partitions
without a leader will be skipped during partition discovery. If set to false
(default), the connector will include all partitions regardless of leader
status. This is useful when dealing with Kafka clusters that may have temporary
leadership issues. [...]
| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details
[...]
diff --git a/docs/zh/connectors/changelog/connector-kafka.md
b/docs/zh/connectors/changelog/connector-kafka.md
index 85e8d10c6e..d99303db9b 100644
--- a/docs/zh/connectors/changelog/connector-kafka.md
+++ b/docs/zh/connectors/changelog/connector-kafka.md
@@ -5,7 +5,7 @@
|[Fix][Connector-V2] Optimize start mode of kafka recovery job
(#9736)|https://github.com/apache/seatunnel/commit/bbde7f6339|2.3.12|
|[Improve][API] Optimize the enumerator API semantics and reduce lock calls at
the connector level
(#9671)|https://github.com/apache/seatunnel/commit/9212a77140|2.3.12|
|[Fix][Connector-V2] Add Filter for Partitions to Prevent Blocking in
KafkaConsumer StreamMode
(#9598)|https://github.com/apache/seatunnel/commit/bd24fa77cb|2.3.12|
-|[Fix][Connecotr-kafka] Fix kafka IllegalArgumentException when offset is -1
(#9376)|https://github.com/apache/seatunnel/commit/142aca7b70|2.3.12|
+|[Fix][Connector-kafka] Fix kafka IllegalArgumentException when offset is -1
(#9376)|https://github.com/apache/seatunnel/commit/142aca7b70|2.3.12|
|[Feature][Connectors-V2] Add end_timestamp for timstamp start mode
(#9318)|https://github.com/apache/seatunnel/commit/68b0504da9|2.3.11|
|[Bugifx][kafka] Fix kafka enumerator assign split NPE
(#9220)|https://github.com/apache/seatunnel/commit/7ca0c0c7e4|2.3.11|
| [Fix][Connector-V2] Fix kafka database name
(#9201)|https://github.com/apache/seatunnel/commit/79d9a937ee|2.3.11|
diff --git a/docs/zh/connectors/sink/Kafka.md b/docs/zh/connectors/sink/Kafka.md
index 0addb1f3ee..57bba9420c 100644
--- a/docs/zh/connectors/sink/Kafka.md
+++ b/docs/zh/connectors/sink/Kafka.md
@@ -43,7 +43,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
| partition | Int | 否 | - | 可以指定分区,所有消息都会发送到此分区
|
| assign_partitions | Array | 否 | - | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息
|
| transaction_prefix | String | 否 | - |
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀
|
-| format | String | 否 | json |
数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 、 avro 、 protobuf 和native。如果使用
json
或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅
[debezium-json](../formats/debezium-json.md) 了解详细信息 |
+| format | String | 否 | json | 数据格式。默认格式是json。可选 text,
canal_json, debezium_json, compatible_debezium_json, ogg_json, maxwell_json,
avro, protobuf 和 native。如果使用 json 或 text 格式,默认字段分隔符是 `,`。如果自定义分隔符,请添加
`field_delimiter` 选项。如果使用 canal 格式,请参考
[canal-json](../formats/canal-json.md)。如果使用 debezium 格式,请参阅
[debezium-json](../formats/debezium-json.md) 了解详细信息 |
| field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符
|
| common-options | | 否 | - | Sink插件常用参数,请参考 [Sink常用选项
](../common-options/sink-common-options.md) 了解详情
|
|protobuf_message_name|String|否|-| format配置为protobuf时生效,取Message名称
|
@@ -150,7 +150,6 @@ sink {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
@@ -171,7 +170,6 @@ sink {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
@@ -206,7 +204,6 @@ sink {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
@@ -222,15 +219,14 @@ sink {
请在启动 SeaTunnel 之前设置 JVM 参数 `java.security.krb5.conf` 或更新 `/etc/krb5.conf` 中的默认
`krb5.conf`。
-源配置示例:
+接收器配置示例:
```hocon
-source {
+sink {
Kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
- kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol = SASL_PLAINTEXT
@@ -255,7 +251,6 @@ sink {
topic = "test_protobuf_topic_fake_source"
bootstrap.servers = "kafkaCluster:9092"
format = protobuf
- kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
diff --git a/docs/zh/connectors/source/Kafka.md
b/docs/zh/connectors/source/Kafka.md
index 90cef2a8dd..6e592c51f8 100644
--- a/docs/zh/connectors/source/Kafka.md
+++ b/docs/zh/connectors/source/Kafka.md
@@ -49,10 +49,10 @@ import ChangeLog from '../changelog/connector-kafka.md';
| format_error_handle_way | String |
否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和
skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。
|
| debezium_record_table_filter | Config |
否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为
`debezium_json` 时使用。请参阅下面的 `debezium_record_table_filter`
|
| field_delimiter | String |
否 | , | 自定义数据格式的字段分隔符。
|
-| start_mode | StartMode[earliest],[group_offsets] |
否 | group_offsets | 消费者的初始消费模式。
|
+| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 否
| group_offsets | 消费者的初始消费模式。
|
| start_mode.offsets | Config |
否 | - | 用于 specific_offsets 消费模式的偏移量。
|
| start_mode.timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的时间。
|
-| start_mode.end_timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的结束时间,只支持批模式
|
+| start_mode.end_timestamp | Long |
否 | - | 用于 "timestamp" 消费模式的结束时间,只支持批模式。
|
| partition-discovery.interval-millis | Long |
否 | -1 | 动态发现主题和分区的间隔时间。
|
| ignore_no_leader_partition | Boolean |
否 | false | 是否忽略没有 leader 的分区。如果设置为
true,在分区发现过程中将跳过没有 leader 的分区。如果设置为 false(默认值),连接器将包含所有分区,无论 leader
状态如何。这在处理可能存在临时 leader 问题的 Kafka 集群时很有用。
|
| common-options | |
否 | - | 源插件的常见参数,详情请参考 [Source Common
Options](../common-options/source-common-options.md)。
|
@@ -60,7 +60,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
| protobuf_schema | String |
否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。
|
| strip_schema_registry_header | Boolean |
否 | false | 当格式设置为 protobuf 时有效。是否在 Protobuf 反序列化之前去除
Confluent Schema Registry 线格式头部(magic byte、schema id 和 message indexes)。当消费使用
Confluent Schema Registry 编码的 Protobuf 消息时,此选项非常有用。启用后,连接器将尝试在解析 Protobuf
消息之前检测并删除 Schema Registry 头部。如果未检测到头部,它将回退到标准的 Protobuf 反序列化。
[...]
| reader_cache_queue_size | Integer |
否 | 2 | Fetcher 与 Reader 线程之间缓冲队列的容量。每个元素是一次
`consumer.poll()` 的整批结果,而非单条消息。详见
[reader_cache_queue_size](#reader_cache_queue_size)。 |
-| is_native | Boolean |
No | false | 支持保留record的源信息。
|
+| is_native | Boolean |
否 | false | 支持保留record的源信息。
|
> 从 checkpoint 或 savepoint 恢复时,Kafka Source 会优先使用 checkpoint 中保存的 split offset。
> `start_mode` 和 consumer group offset 只在首次启动,或为尚未存在 checkpoint
> 状态的新发现分区初始化位点时生效。
@@ -90,7 +90,7 @@ import ChangeLog from '../changelog/connector-kafka.md';
debezium_record_table_filter {
database_name = "test"
schema_name = "public" // null 如果不存在
- table_name = "products"
+ table_name = "products"
}
```