Hisoka-X commented on code in PR #5992:
URL: https://github.com/apache/seatunnel/pull/5992#discussion_r1582051933


##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -35,6 +35,7 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 |                Name                 |                                    
Type                                     | Required |         Default          
|                                                                               
                                                                                
                                                      Description               
                                                                                
                                                                                
                                      |
 
|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | topic                               | String                                 
                                     | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                                                                
                                                         |
+| table_list                          | Map                                    
                                     | No       | -                        | 
Topic list config You can configure only one `table_list` and one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                                                                                
                                       |

Review Comment:
   ```suggestion
   | table_list                          | Map                                  
                                       | No       | -                        | 
Topic list config. You can configure only one `table_list` or one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                                                                                
                                       |
   ```



##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -180,3 +181,65 @@ source {
 }
 ```
 
+### Multiple Kafka Source
+
+> Currently, multiple kafka source reads are supported using the Zeta engine, 
but note that you can only configure one instance of `bootstrap.servers`, and 
only one in the `table_list` and `topic` parameters,Currently, when you use 
multiple topics in `table_list`, you are still free to set parameters for each 
`topic`

Review Comment:
   ```suggestion
   > Currently, multiple kafka source reads are supported using the Zeta 
engine, but note that you can only configure one instance of 
`bootstrap.servers`, and only one the `table_list` or `topic` parameters. 
Currently, when you use multiple topics in `table_list`, you are still free to 
set parameters for each `topic`
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -177,4 +177,10 @@ public class Config {
                     .defaultValue(KafkaSemantics.NON)
                     .withDescription(
                             "Semantics that can be chosen 
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
+    public static final Option<List<Map<String, Object>>> TABLE_LIST =
+            Options.key("table_list")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "Topic list config You can configure only one 
`table_list` and one `topic` at the same time");

Review Comment:
   ```suggestion
                               "Topic list config. You can configure only one 
`table_list` or one `topic` at the same time");
   ```



##########
docs/en/connector-v2/source/kafka.md:
##########
@@ -180,3 +181,65 @@ source {
 }
 ```
 
+### Multiple Kafka Source
+
+> Currently, multiple kafka source reads are supported using the Zeta engine, 
but note that you can only configure one instance of `bootstrap.servers`, and 
only one in the `table_list` and `topic` parameters,Currently, when you use 
multiple topics in `table_list`, you are still free to set parameters for each 
`topic`
+
+```hocon
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    table_list = [
+      {
+        topic = "^test-ogg-sou.*"
+        pattern = "true"
+        consumer.group = "ogg_multi_group"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = ogg_json
+      }, {
+        topic = "test-cdc_mds"
+        consumer.group = "canal_multi_group"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = canal_json
+      }
+    ]
+ # Each topic in the `table_list` shares the config
+    kafka.config = {
+      client.id = client_1
+      max.poll.records = 50000
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      // The current table name is the same as the topic name in order to 
maintain fewer configuration parameters
+      table-names = ["^test-ogg-sou.*","test-cdc_mds"]
+    }
+  }
+}

Review Comment:
   How about add demo of write data to doris? It's better to show how to use 
multi-table in one job.



##########
seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/CompatibleKafkaConnectDeserializationSchema.java:
##########
@@ -69,12 +71,15 @@ public class CompatibleKafkaConnectDeserializationSchema
     /** Object mapper for parsing the JSON. */
     private final ObjectMapper objectMapper = new ObjectMapper();
 
+    private final CatalogTable catalogTable;
+
     public CompatibleKafkaConnectDeserializationSchema(
             @NonNull SeaTunnelRowType seaTunnelRowType,
             boolean keySchemaEnable,
             boolean valueSchemaEnable,
             boolean failOnMissingField,
-            boolean ignoreParseErrors) {
+            boolean ignoreParseErrors,
+            CatalogTable catalogTable) {

Review Comment:
   Why we need both use `seaTunnelRowType` and `catalogTable` at the same time? 
I think we can replace `seaTunnelRowType` by `catalogTable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to