houbb opened a new issue, #6302: URL: https://github.com/apache/seatunnel/issues/6302
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened BUG: mysql cdc table-names config one table, works fine. But config multi tables, meets org.apache.kafka.connect.errors.DataException: xxx is not a valid field name(mysql cdc 当 table-names 指定监听多张表的时候,examples 本地测试直接失败。只指定一张表的时候,不存在问题) Run Mode: local seatunnel-examples run config file directly. Ask: How to fix this problem? I think mysql-cdc should support config multi-tables in one config. ### SeaTunnel Version Apache seatunnel version:v2.3.3 Mysql version: 5.7.31-log ### SeaTunnel Config ```conf For simple, sink is console. # Defining the runtime environment env { # You can set flink configuration here parallelism = 1 job.mode = "STREAMING" job.name = "merge_cdc.user_info-STREAMING" checkpoint.interval = 10000 } source{ MySQL-CDC { base-url = "jdbc:mysql://127.0.0.1:3306/cdc?useSSL=false&serverTimezone=Asia/Shanghai" driver = "com.mysql.jdbc.Driver" username = "admin" password = "123456" table-names = ["cdc.user_info", "cdc.role_info"] startup.mode = "initial" result_table_name="merge_cdc.user_info" } } transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform-v2/sql } sink { Console { } } ``` ### Running Command ```shell Run Mode: local seatunnel-examples run config file directly. public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException { String configurePath = args.length > 0 ? args[0] : "/examples/cdc/mysql_cdc_to_file_multitables.conf"; String configFile = getTestConfigFile(configurePath); ClientCommandArgs clientCommandArgs = new ClientCommandArgs(); clientCommandArgs.setConfigFile(configFile); clientCommandArgs.setCheckConfig(false); clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString()); // Change Execution Mode to CLUSTER to use client mode, before do this, you should start // SeaTunnelEngineServerExample clientCommandArgs.setMasterType(MasterType.LOCAL); SeaTunnel.run(clientCommandArgs.buildCommand()); } ``` ### Error Exception ```log 2024-01-29 18:00:26,226 ERROR org.apache.seatunnel.core.starter.SeaTunnel - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.example.engine.cdc.MysqlCdcDefaultToLocalFileMultiTablesExample.main(MysqlCdcDefaultToLocalFileMultiTablesExample.java:44) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: username is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.get(Struct.java:74) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:84) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractAfterRow(SeaTunnelRowDebeziumDeserializeSchema.java:209) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:178) at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:110) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:155) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:130) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:89) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:55) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:108) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) at java.util.concurrent.FutureTask.run(FutureTask.java) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:184) ... 2 more ``` ### Zeta or Flink or Spark Version default version ### Java or Scala Version >java -version java version "1.8.0_371" Java(TM) SE Runtime Environment (build 1.8.0_371-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.371-b11, mixed mode) ### Screenshots  ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
