hailin0 commented on code in PR #8391: URL: https://github.com/apache/seatunnel/pull/8391#discussion_r1903279716
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -105,6 +106,12 @@ public class Config {
.defaultValue(true)
.withDescription("Does the debezium record carry a
schema.");
+ public static final Option<TableSchemaOptions.TableIdentifier>
DEBEZIUM_RECORD_TABLE_FILTER =
Review Comment:
add into `KafkaSourceFactory` optionRule?
##########
docs/en/connector-v2/source/Kafka.md:
##########
@@ -32,27 +32,42 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Source Options
-| Name | Type
| Required | Default |
Description
|
-|-------------------------------------|---------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
|
-| table_list | Map
| No | - |
Topic list config You can configure only one `table_list` and one `topic` at
the same time
|
-| bootstrap.servers | String
| Yes | - |
Comma separated list of Kafka brokers.
|
-| pattern | Boolean
| No | false | If
`pattern` is set to `true`,the regular expression for a pattern of topic names
to read from. All topics in clients with names that match the specified regular
expression will be subscribed by the consumer.
|
-| consumer.group | String
| No | SeaTunnel-Consumer-Group |
`Kafka consumer group id`, used to distinguish different consumer groups.
|
-| commit_on_checkpoint | Boolean
| No | true | If
true the consumer's offset will be periodically committed in the background.
|
-| poll.timeout | Long
| No | 10000 | The
interval(millis) for poll messages.
|
-| kafka.config | Map
| No | - | In
addition to the above necessary parameters that must be specified by the `Kafka
consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
|
-| schema | Config
| No | - | The
structure of the data, including field names and field types.
|
-| format | String
| No | json | Data
format. The default format is json. Optional text format, canal_json,
debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or
text format. The default field separator is ", ". If you customize the
delimiter, add the "field_delimiter" option.If you use canal format, please
refer to [canal-json](../formats/canal-json.md) for details.If you use debezium
format, please refer to [debezium-json](../formats/debezium-json.md) for
details. Some format details please refer [formats](../formats) |
-| format_error_handle_way | String
| No | fail | The
processing method of data format error. The default value is fail, and the
optional value is (fail, skip). When fail is selected, data format error will
block and an exception will be thrown. When skip is selected, data format error
will skip this line data.
|
-| field_delimiter | String
| No | , |
Customize the field delimiter for data format.
|
+| Name | Type
| Required | Default |
Description
|
+|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
|
+| table_list | Map
| No | - |
Topic list config You can configure only one `table_list` and one `topic` at
the same time
|
+| bootstrap.servers | String
| Yes | - |
Comma separated list of Kafka brokers.
|
+| pattern | Boolean
| No | false | If
`pattern` is set to `true`,the regular expression for a pattern of topic names
to read from. All topics in clients with names that match the specified regular
expression will be subscribed by the consumer.
|
+| consumer.group | String
| No | SeaTunnel-Consumer-Group |
`Kafka consumer group id`, used to distinguish different consumer groups.
|
+| commit_on_checkpoint | Boolean
| No | true | If
true the consumer's offset will be periodically committed in the background.
|
+| poll.timeout | Long
| No | 10000 |
The interval(millis) for poll messages.
|
+| kafka.config | Map
| No | - | In
addition to the above necessary parameters that must be specified by the `Kafka
consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
|
+| schema | Config
| No | - |
The structure of the data, including field names and field types.
|
+| format | String
| No | json |
Data format. The default format is json. Optional text format, canal_json,
debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or
text format. The default field separator is ", ". If you customize the
delimiter, add the "field_delimiter" option.If you use canal format, please
refer to [canal-json](../formats/canal-json.md) for details.If you use debezium
format, please refer to [debezium-json](../formats/debezium-json.md) for
details. Some format details please refer [formats](../formats) |
+| format_error_handle_way | String
| No | fail |
The processing method of data format error. The default value is fail, and the
optional value is (fail, skip). When fail is selected, data format error will
block and an exception will be thrown. When skip is selected, data format error
will skip this line data.
|
+| debezium_record_table_filter | Config
| No | - |
Used for filtering data in debezium format, only when the format is set to
`debezium_json`. Please refer `debezium_record_table_filter` below
|
+| field_delimiter | String
| No | , |
Customize the field delimiter for data format.
|
| start_mode |
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] |
No | group_offsets | The initial consumption pattern of
consumers.
|
-| start_mode.offsets | Config
| No | - | The
offset required for consumption mode to be specific_offsets.
|
-| start_mode.timestamp | Long
| No | - | The
time required for consumption mode to be "timestamp".
|
-| partition-discovery.interval-millis | Long
| No | -1 | The
interval for dynamically discovering topics and partitions.
|
-| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
|
-| protobuf_message_name | String
| No | - |
Effective when the format is set to protobuf, specifies the Message name
|
-| protobuf_schema | String
| No | - |
Effective when the format is set to protobuf, specifies the Schema definition
|
+| start_mode.offsets | Config
| No | - |
The offset required for consumption mode to be specific_offsets.
|
+| start_mode.timestamp | Long
| No | - |
The time required for consumption mode to be "timestamp".
|
+| partition-discovery.interval-millis | Long
| No | -1 |
The interval for dynamically discovering topics and partitions.
|
+| common-options |
| No | - |
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
|
+| protobuf_message_name | String
| No | - |
Effective when the format is set to protobuf, specifies the Message name
|
+| protobuf_schema | String
| No | - |
Effective when the format is set to protobuf, specifies the Schema definition
|
+
+### debezium_record_table_filter
+
+We can use `debezium_record_table_filter` to filter the data in the debezium
format. The configuration is as follows:
+
+```hocon
+debezium_record_table_filter {
+ database_name = "test"
Review Comment:
```suggestion
database_name = "test" // null if not exists
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
