MirrerZu opened a new issue, #6831: URL: https://github.com/apache/seatunnel/issues/6831
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When I use MySQL JDBC source and Paimon sink under batch mode, Some data with the same primary key have not been updated correctly. However I'm sure it's a PK table using deduplicate merge. If I add `checkpoint.interval=1000` into job env config, **all the data can been updated correctly**, but job will throw exciption . **And I didn't find any errors or warnings in HDFS logs.** To verify, I used Flink's batch mode and JDBC external tables to write same data, all the data were updated correctly without any errors. ### SeaTunnel Version 2.3.5 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "BATCH" } source { jdbc { url = "jdbc:mysql://10.1.xxx.xxx:3306/test" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 180 user = "root" password = "*****" table_path="test.test2" } } sink { Paimon { warehouse = "hdfs://10.1.xxx.xxx:9000/paimon" database = "mytest" table = "test2" paimon.table.write-props = { bucket = 1 } } } ``` ### Running Command ```shell ./bin/seatunnel.sh --config ./config/v2.mysql.config -e local ``` ### Error Exception ```log 2024-05-10 17:32:01,333 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@6c0e24be java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) [seatunnel-starter.jar:2.3.5] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) [seatunnel-starter.jar:2.3.5] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_402] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_402] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_402] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402] Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5] ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_402] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_402] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5] ... 16 more Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165) ~[connector-paimon-2.3.5.jar:2.3.5] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5] ... 5 more Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing. at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) ~[connector-paimon-2.3.5.jar:2.3.5] at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201) ~[connector-paimon-2.3.5.jar:2.3.5] at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157) ~[connector-paimon-2.3.5.jar:2.3.5] at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5] ... 5 more 2024-05-10 17:32:01,339 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 70000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} 2024-05-10 17:32:01,340 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Interrupted task 60000 - org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@49d2a9d8 2024-05-10 17:32:01,340 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, taskId = 60000, taskGroup = TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} 2024-05-10 17:32:01,340 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - Release classloader for job 841246167579754497 with jars [file:/bigdata/seatunnel/connectors/connector-paimon-2.3.5.jar, file:/bigdata/seatunnel/connectors/connector-jdbc-2.3.5.jar] 2024-05-10 17:32:01,349 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-1 2024-05-10 17:32:01,350 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} 2024-05-10 17:32:01,350 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}] - recycle classloader for thread st-multi-table-sink-writer-2 2024-05-10 17:32:01,351 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Task TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000} complete with state FAILED 2024-05-10 17:32:01,351 INFO [o.a.s.e.s.CoordinatorService ] [hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] Received task end from execution TaskGroupLocation{jobId=841246167579754497, pipelineId=1, taskGroupId=50000}, state FAILED 2024-05-10 17:32:01,353 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] turned from state RUNNING to FAILED. 2024-05-10 17:32:01,353 INFO [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] state process is stopped 2024-05-10 17:32:01,353 ERROR [o.a.s.e.s.d.p.PhysicalVertex ] [hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ... 17 more Caused by: org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException: ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink table store failed to prepare commit at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165) at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ... 5 more Caused by: java.lang.IllegalStateException: BatchTableWrite only support one-time committing. at org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) at org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201) at org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157) ... 6 more ``` ### Zeta or Flink or Spark Version Zeta 2.3.5 ### Java or Scala Version jdk 1.8.0_402 ### Screenshots   ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
