houbb opened a new issue, #6302:
URL: https://github.com/apache/seatunnel/issues/6302

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   BUG: mysql cdc table-names config one table, works fine. But config multi 
tables, meets org.apache.kafka.connect.errors.DataException: xxx is not a valid 
field name(mysql cdc 当 table-names 指定监听多张表的时候,examples 本地测试直接失败。只指定一张表的时候,不存在问题)
   
   Run Mode: local seatunnel-examples run config file directly.
   
   Ask: How to fix this problem? I think mysql-cdc should support config 
multi-tables in one config.
   
   
   ### SeaTunnel Version
   
   Apache seatunnel version:v2.3.3
   
   Mysql version: 5.7.31-log
   
   ### SeaTunnel Config
   
   ```conf
   For simple, sink is console.
   
   
   # Defining the runtime environment
   env {
     # You can set flink configuration here
     parallelism = 1
     job.mode = "STREAMING"
     job.name = "merge_cdc.user_info-STREAMING"
     checkpoint.interval = 10000
   }
   source{
       MySQL-CDC {
           base-url = 
"jdbc:mysql://127.0.0.1:3306/cdc?useSSL=false&serverTimezone=Asia/Shanghai"
           driver = "com.mysql.jdbc.Driver"
           username = "admin"
           password = "123456"
           table-names = ["cdc.user_info", "cdc.role_info"]
   
           startup.mode = "initial"
           result_table_name="merge_cdc.user_info"
       }
   }
   
   transform {
       # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
       # please go to https://seatunnel.apache.org/docs/transform-v2/sql
   }
   
   sink {
       Console {
       }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   Run Mode: local seatunnel-examples run config file directly.
   
   
   public static void main(String[] args)
               throws FileNotFoundException, URISyntaxException, 
CommandException {
           String configurePath = args.length > 0 ? args[0] : 
"/examples/cdc/mysql_cdc_to_file_multitables.conf";
           String configFile = getTestConfigFile(configurePath);
           ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
           clientCommandArgs.setConfigFile(configFile);
           clientCommandArgs.setCheckConfig(false);
           
clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
           // Change Execution Mode to CLUSTER to use client mode, before do 
this, you should start
           // SeaTunnelEngineServerExample
           clientCommandArgs.setMasterType(MasterType.LOCAL);
           SeaTunnel.run(clientCommandArgs.buildCommand());
       }
   ```
   
   
   ### Error Exception
   
   ```log
   2024-01-29 18:00:26,226 ERROR org.apache.seatunnel.core.starter.SeaTunnel - 
   
===============================================================================
   
   
   
   Exception in thread "main" 
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel 
job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.engine.cdc.MysqlCdcDefaultToLocalFileMultiTablesExample.main(MysqlCdcDefaultToLocalFileMultiTablesExample.java:44)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.connect.errors.DataException: username is not a valid field 
name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:84)
        at 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractAfterRow(SeaTunnelRowDebeziumDeserializeSchema.java:209)
        at 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:178)
        at 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:110)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:155)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:130)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
        at 
org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184)
        ... 2 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   default version
   
   ### Java or Scala Version
   
   >java -version
   java version "1.8.0_371"
   Java(TM) SE Runtime Environment (build 1.8.0_371-b11)
   Java HotSpot(TM) 64-Bit Server VM (build 25.371-b11, mixed mode)
   
   ### Screenshots
   
   
![username_config](https://github.com/apache/seatunnel/assets/18375710/3e743456-1c48-4370-9359-d851061d515a)
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to