MirrerZu opened a new issue, #6831:
URL: https://github.com/apache/seatunnel/issues/6831

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When I use MySQL JDBC source and Paimon sink under batch mode, Some data 
with the same primary key have not been updated correctly. However I'm sure 
it's a PK table using deduplicate merge.
   If I add `checkpoint.interval=1000` into job env config, **all the data can 
been updated correctly**, but job will throw exciption .
   **And I didn't find any errors or warnings in HDFS logs.**
   To verify, I used Flink's batch mode and JDBC external tables to write same 
data, all the data were updated correctly without any errors.
   
   ### SeaTunnel Version
   
   2.3.5
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     jdbc {
       url = "jdbc:mysql://10.1.xxx.xxx:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 180
        user = "root"
        password = "*****"
        table_path="test.test2"
     }
   }
   
   sink {
   
     Paimon {
       warehouse = "hdfs://10.1.xxx.xxx:9000/paimon"
       database = "mytest"
       table = "test2"
        paimon.table.write-props = {
           bucket = 1
       }
     }
    
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config ./config/v2.mysql.config -e local
   ```
   
   
   ### Error Exception
   
   ```log
   2024-05-10 17:32:01,333 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Exception in 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@6c0e24be
   java.lang.RuntimeException: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
 ~[seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
 [seatunnel-starter.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
 [seatunnel-starter.jar:2.3.5]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_402]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_402]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_402]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_402]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_402]
   Caused by: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)
 ~[seatunnel-transforms-v2.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
 ~[seatunnel-starter.jar:2.3.5]
           ... 16 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
~[?:1.8.0_402]
           at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
~[?:1.8.0_402]
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)
 ~[seatunnel-transforms-v2.jar:2.3.5]
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
 ~[seatunnel-starter.jar:2.3.5]
           ... 16 more
   Caused by: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165)
 ~[connector-paimon-2.3.5.jar:2.3.5]
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
 ~[seatunnel-transforms-v2.jar:2.3.5]
           ... 5 more
   Caused by: java.lang.IllegalStateException: BatchTableWrite only support 
one-time committing.
           at 
org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182) 
~[connector-paimon-2.3.5.jar:2.3.5]
           at 
org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201)
 ~[connector-paimon-2.3.5.jar:2.3.5]
           at 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157)
 ~[connector-paimon-2.3.5.jar:2.3.5]
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
 ~[seatunnel-transforms-v2.jar:2.3.5]
           ... 5 more
   2024-05-10 17:32:01,339 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, 
taskId = 70000, taskGroup = TaskGroupLocation{jobId=841246167579754497, 
pipelineId=1, taskGroupId=50000}
   2024-05-10 17:32:01,340 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] Interrupted 
task 60000 - 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@49d2a9d8
   2024-05-10 17:32:01,340 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - [localhost]:5801 [seatunnel-387664] [5.1] taskDone, 
taskId = 60000, taskGroup = TaskGroupLocation{jobId=841246167579754497, 
pipelineId=1, taskGroupId=50000}
   2024-05-10 17:32:01,340 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - Release classloader for job 841246167579754497 with jars 
[file:/bigdata/seatunnel/connectors/connector-paimon-2.3.5.jar, 
file:/bigdata/seatunnel/connectors/connector-jdbc-2.3.5.jar]
   2024-05-10 17:32:01,349 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - recycle classloader for thread 
st-multi-table-sink-writer-1
   2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - recycle classloader for thread 
BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}
   2024-05-10 17:32:01,350 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000}] - recycle classloader for thread 
st-multi-table-sink-writer-2
   2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.TaskExecutionService] 
[hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] 
Task TaskGroupLocation{jobId=841246167579754497, pipelineId=1, 
taskGroupId=50000} complete with state FAILED
   2024-05-10 17:32:01,351 INFO  [o.a.s.e.s.CoordinatorService  ] 
[hz.main.seaTunnel.task.thread-5] - [localhost]:5801 [seatunnel-387664] [5.1] 
Received task end from execution TaskGroupLocation{jobId=841246167579754497, 
pipelineId=1, taskGroupId=50000}, state FAILED
   2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), 
Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] turned 
from state RUNNING to FAILED.
   2024-05-10 17:32:01,353 INFO  [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), 
Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] state 
process is stopped
   2024-05-10 17:32:01,353 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] 
[hz.main.seaTunnel.task.thread-5] - Job SeaTunnel_Job (841246167579754497), 
Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-jdbc]-SourceTask (1/1)] end 
with state FAILED and Exception: java.lang.RuntimeException: 
java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)
           at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
           at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
           at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703)
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)
           at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191)
           ... 16 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at java.util.concurrent.FutureTask.report(FutureTask.java:122)
           at java.util.concurrent.FutureTask.get(FutureTask.java:192)
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232)
           ... 17 more
   Caused by: 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException:
 ErrorCode:[PAIMON-03], ErrorDescription:[Paimon pre commit failed] - Flink 
table store failed to prepare commit
           at 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:165)
           at 
org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)
           ... 5 more
   Caused by: java.lang.IllegalStateException: BatchTableWrite only support 
one-time committing.
           at 
org.apache.paimon.utils.Preconditions.checkState(Preconditions.java:182)
           at 
org.apache.paimon.table.sink.TableWriteImpl.prepareCommit(TableWriteImpl.java:201)
           at 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter.prepareCommit(PaimonSinkWriter.java:157)
           ... 6 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   Zeta 2.3.5
   
   ### Java or Scala Version
   
   jdk 1.8.0_402
   
   ### Screenshots
   
   
![image](https://github.com/apache/seatunnel/assets/43441555/d2505a05-b37c-46da-a091-6658d04ec4c3)
   
![image](https://github.com/apache/seatunnel/assets/43441555/fb93cc1e-4a40-4223-94bb-30339489dc7e)
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to