W-dragan opened a new issue, #6593: URL: https://github.com/apache/seatunnel/issues/6593
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When I use MySQL upstream with a primary key ID of bigint and Oracle downstream with a primary key ID of nvarchar, and set "primary_keys": ["ID"], it will report ORA-01722: invalid number <img width="272" alt="oracle_id_error" src="https://github.com/apache/seatunnel/assets/69575498/efdbc1de-9a01-4438-9a1f-074b7f9d4881"> Although this is a limitation at the Oracle level, converting numbers to strings should be a universal supported scenario. When writing data on the sink side, the data type should be based on the sink side rather than the type output by the transformer ### SeaTunnel Version 2.3.4 ### SeaTunnel Config ```conf { "env" : { "execution.parallelism" : 1, "job.mode" : "BATCH" }, "source" : [ { "password" : "123456", "driver" : "com.mysql.cj.jdbc.Driver", "query" : "select * from md_d_wl001", "connection_check_timeout_sec" : 100, "fetch_size" : 1000, "result_table_name" : "result_0", "plugin_name" : "jdbc", "user" : "root", "url" : "jdbc:mysql://127.0.0.1:3307/seatunnel?useSSL=false&serverTimezone=GMT%2B8" } ], "transform" : [ { "query": "SELECT id as ID,tenant_id as SITE_CODE,parent_id as LEVEL_CODE,group_text as GROUP_TEXT,parent_id as PARENT_ID,create_by as CREATE_BY,create_by_id as CREATE_BY_ID,create_time as CREATE_TIME,last_update_by as LAST_UPDATE_BY,last_update_by_id as LAST_UPDATE_BY_ID,last_update_time as LAST_UPDATE_TIME,group_code as GROUP_CODE FROM result_0", "source_table_name": "result_0", "result_table_name": "source_1", "plugin_name": "sql" } ], "sink" : [ { "password" : "xxx", "database" : "xxx", "driver" : "oracle.jdbc.driver.OracleDriver", "source_table_name" : "source_1", "generate_sink_sql" : true, "primary_keys":["ID"], "plugin_name" : "jdbc", "user" : "xxx", "url" : "jdbc:oracle:thin:@//127.0.0.1:1521/MONGOPDB", "table" : "xxx.CM_MATERIAL_GROUP" } ] } ``` ### Running Command ```shell http submit ``` ### Error Exception ```log 2024-03-27 17:49:08,101 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is: MERGE INTO "DATAOPS"."CM_MATERIAL_GROUP" TARGET USING (SELECT ? "ID", ? "SITE_CODE", ? "LEVEL_CODE", ? "GROUP_TEXT", ? "PARENT_ID", ? "CREATE_BY", ? "CREATE_BY_ID", ? "CREATE_TIME", ? "LAST_UPDATE_BY", ? "LAST_UPDATE_BY_ID", ? "LAST_UPDATE_TIME", ? "GROUP_CODE" FROM DUAL) SOURCE ON (TARGET."ID"=SOURCE."ID") WHEN MATCHED THEN UPDATE SET TARGET."SITE_CODE"=SOURCE."SITE_CODE", TARGET."LEVEL_CODE"=SOURCE."LEVEL_CODE", TARGET."GROUP_TEXT"=SOURCE."GROUP_TEXT", TARGET."PARENT_ID"=SOURCE."PARENT_ID", TARGET."CREATE_BY"=SOURCE."CREATE_BY", TARGET."CREATE_BY_ID"=SOURCE."CREATE_BY_ID", TARGET."CREATE_TIME"=SOURCE."CREATE_TIME", TARGET."LAST_UPDATE_BY"=SOURCE."LAST_UPDATE_BY", TARGET."LAST_UPDATE_BY_ID"=SOURCE."LAST_UPDATE_BY_ID", TARGET."LAST_UPDATE_TIME"=SOURCE."LAST_UPDATE_TIME", TARGET."GROUP_CODE"=SOURCE."GROUP_CODE" WHEN NOT MATCHED THEN INSERT ("ID", "SITE_CODE", "LEVEL_CODE", "GROUP_TEXT", "PARENT_ID", "CREATE_BY", "CREATE_BY_ID", "CREATE_TIME", "LAST_UPDATE_BY", "LAST_UPDATE_BY_ID" , "LAST_UPDATE_TIME", "GROUP_CODE") VALUES (SOURCE."ID", SOURCE."SITE_CODE", SOURCE."LEVEL_CODE", SOURCE."GROUP_TEXT", SOURCE."PARENT_ID", SOURCE."CREATE_BY", SOURCE."CREATE_BY_ID", SOURCE."CREATE_TIME", SOURCE."LAST_UPDATE_BY", SOURCE."LAST_UPDATE_BY_ID", SOURCE."LAST_UPDATE_TIME", SOURCE."GROUP_CODE") 2024-03-27 17:49:08,102 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement - PrepareStatement sql is: DELETE FROM "DATAOPS"."CM_MATERIAL_GROUP" WHERE "ID" = ? 2024-03-27 17:49:08,112 ERROR org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat - JDBC executeBatch error, retry times = 0 java.sql.BatchUpdateException: ORA-01722: 无效数字 at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9723) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9499) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127) ~[HikariCP-4.0.3.jar:?] at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java) ~[HikariCP-4.0.3.jar:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:533) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.executeBatch(BufferReducedBatchStatementExecutor.java:89) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:167) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:131) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:137) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:185) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:175) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:64) ~[classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[classes/:?] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) [classes/:?] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) [classes/:?] at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) [classes/:?] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) [classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) [classes/:?] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) [classes/:?] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:648) [classes/:?] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:949) [classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?] 2024-03-27 17:49:08,113 WARN org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@22d486e0 java.lang.RuntimeException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:246) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:64) ~[classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[classes/:?] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[classes/:?] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[classes/:?] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[classes/:?] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[classes/:?] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[classes/:?] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:648) [classes/:?] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:949) [classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?] at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?] at java.lang.Thread.run(Thread.java:833) [?:?] Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:137) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:137) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:185) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:175) ~[classes/:?] ... 17 more Caused by: java.sql.BatchUpdateException: ORA-01722: 无效数字 at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9723) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9499) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237) ~[ojdbc8-19.18.0.0.jar:19.18.0.0.0] at com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127) ~[HikariCP-4.0.3.jar:?] at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java) ~[HikariCP-4.0.3.jar:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:533) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor.executeBatch(BufferReducedBatchStatementExecutor.java:89) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:167) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:131) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:137) ~[classes/:?] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:185) ~[classes/:?] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:175) ~[classes/:?] ... 17 more 2024-03-27 17:49:08,114 WARN org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel] [5.1] Interrupted task 40000 - org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@5b8932d3 2024-03-27 17:49:08,117 ERROR org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (825303504432463873), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)], taskGroupLocation: [TaskGroupLocation{jobId=825303504432463873, pipelineId=1, taskGroupId=30000}] end with state FAILED and Exception: java.lang.RuntimeException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:246) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:64) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:648) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:949) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:137) at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:137) at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:185) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:175) ... 17 more Caused by: java.sql.BatchUpdateException: ORA-01722: 无效数字 ``` ### Zeta or Flink or Spark Version 2.3.4 ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
