Hisoka-X commented on code in PR #5992:
URL: https://github.com/apache/seatunnel/pull/5992#discussion_r1582051933
##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -35,6 +35,7 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| Name |
Type | Required | Default
|
Description
|
|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String
| Yes | - |
Topic name(s) to read data from when the table is used as source. It also
supports topic list for source by separating topic by comma like
'topic-1,topic-2'.
|
+| table_list | Map
| No | - |
Topic list config You can configure only one `table_list` and one `topic` at
the same time
|
Review Comment:
```suggestion
| table_list | Map
| No | - |
Topic list config. You can configure only one `table_list` or one `topic` at
the same time
|
```
##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -180,3 +181,65 @@ source {
}
```
+### Multiple Kafka Source
+
+> Currently, multiple kafka source reads are supported using the Zeta engine,
but note that you can only configure one instance of `bootstrap.servers`, and
only one in the `table_list` and `topic` parameters,Currently, when you use
multiple topics in `table_list`, you are still free to set parameters for each
`topic`
Review Comment:
```suggestion
> Currently, multiple kafka source reads are supported using the Zeta
engine, but note that you can only configure one instance of
`bootstrap.servers`, and only one the `table_list` or `topic` parameters.
Currently, when you use multiple topics in `table_list`, you are still free to
set parameters for each `topic`
```
##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -177,4 +177,10 @@ public class Config {
.defaultValue(KafkaSemantics.NON)
.withDescription(
"Semantics that can be chosen
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
+ public static final Option<List<Map<String, Object>>> TABLE_LIST =
+ Options.key("table_list")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "Topic list config You can configure only one
`table_list` and one `topic` at the same time");
Review Comment:
```suggestion
"Topic list config. You can configure only one
`table_list` or one `topic` at the same time");
```
##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -180,3 +181,65 @@ source {
}
```
+### Multiple Kafka Source
+
+> Currently, multiple kafka source reads are supported using the Zeta engine,
but note that you can only configure one instance of `bootstrap.servers`, and
only one in the `table_list` and `topic` parameters,Currently, when you use
multiple topics in `table_list`, you are still free to set parameters for each
`topic`
+
+```hocon
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafka_e2e:9092"
+ table_list = [
+ {
+ topic = "^test-ogg-sou.*"
+ pattern = "true"
+ consumer.group = "ogg_multi_group"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = ogg_json
+ }, {
+ topic = "test-cdc_mds"
+ consumer.group = "canal_multi_group"
+ start_mode = earliest
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ description = "string"
+ weight = "string"
+ }
+ },
+ format = canal_json
+ }
+ ]
+ # Each topic in the `table_list` shares the config
+ kafka.config = {
+ client.id = client_1
+ max.poll.records = 50000
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ // The current table name is the same as the topic name in order to
maintain fewer configuration parameters
+ table-names = ["^test-ogg-sou.*","test-cdc_mds"]
+ }
+ }
+}
Review Comment:
How about add demo of write data to doris? It's better to show how to use
multi-table in one job.
##########
seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java:
##########
@@ -69,12 +71,15 @@ public class CompatibleKafkaConnectDeserializationSchema
/** Object mapper for parsing the JSON. */
private final ObjectMapper objectMapper = new ObjectMapper();
+ private final CatalogTable catalogTable;
+
public CompatibleKafkaConnectDeserializationSchema(
@NonNull SeaTunnelRowType seaTunnelRowType,
boolean keySchemaEnable,
boolean valueSchemaEnable,
boolean failOnMissingField,
- boolean ignoreParseErrors) {
+ boolean ignoreParseErrors,
+ CatalogTable catalogTable) {
Review Comment:
Why we need both use `seaTunnelRowType` and `catalogTable` at the same time?
I think we can replace `seaTunnelRowType` by `catalogTable`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]