houjiakun opened a new issue, #8101: URL: https://github.com/apache/seatunnel/issues/8101
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is inaccurate, Always the same as SourceReceivedCount. During testing, some duplicate data was created, The source table has 13 data entries, of which 1 is duplicated. The target table ID serves as the primary key. After executing the seatunnel task, 12 data entries were written to the target table, and the other 1 primary key was duplicated, resulting in a write failure. But the SinkWriteCount returned by the API interface above is 13. How should I set it to ensure that the SinkWriteCount is correct. ### SeaTunnel Version 2.3.7 ### SeaTunnel Config ```conf env { job.mode = "BATCH" job.name = "data_label" job.retry.times=0 parallelism = 1 } source { Jdbc { url= "jdbc:mysql://xxx:3306/test?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false" driver= "com.mysql.jdbc.Driver" user = "root" password = "xxx" result_table_name= "data_label_src" query= "SELECT id as id, label_name AS label_name,order_no as order_no,is_del AS is_del,crt_time AS crt_time,crt_user AS crt_user,crt_name AS crt_name,crt_host AS crt_host,upd_time AS upd_time,upd_user AS upd_user,upd_name AS upd_name,upd_host AS upd_host,description AS description FROM data_label" } } sink { Jdbc { url= "jdbc:mysql://xxxx:3306/test1?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false" driver= "com.mysql.jdbc.Driver" user = "root" password = "xxxx" source_table_name = "data_label_src" database = "test1" schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST" max_retries=0 table = "data_label" generate_sink_sql = true } } ``` ### Running Command ```shell ./bin/seatunnel.sh --config test.config -m cluster ``` ### Error Exception ```log http://xxx:5801/hazelcast/rest/maps/finished-jobs return message: [ { "jobId": "912151384986484738", "jobName": "data_label", "jobStatus": "FAILED", "errorMsg": "java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:292)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:74)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)\n\tat org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java :50)\n\tat org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)\n\tat org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)\n\tat org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)\n\tat org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java .lang.Thread.run(Thread.java:750)\nCaused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)\n\tat org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208)\n\t... 16 more\nCaused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat java.util.concurrent.FutureTask.report(FutureTask.java:122)\n\tat java.util.concurrent.FutureTask.get(FutureTask.java:192)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:23 2)\n\t... 17 more\nCaused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed]\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:142)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCurrentTx(JdbcExactlyOnceSinkWriter.java:202)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCommit(JdbcExactlyOnceSinkWriter.java:144)\n\tat org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)\n\t... 5 more\nCaused by: java.sql.BatchUpdateException: Duplicate entry '3' for key 'PRIMARY'\n\tat sun.reflect.GeneratedConstructorAccessor55.newInstance(Unknown Source)\n\tat sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat com.mysql.cj.util.Util.handleNewInstance(Util.java:192)\n\tat com.mysql.cj.util.Util.getInstance(Util.java:167)\n\tat com.mysql.cj.util.Util.getInstance(Util.java:174)\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)\n\tat com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)\n\tat com.mysql.cj.jdbc.StatementWrapper.executeBatch(StatementWrapper.java:545)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:534)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51)\n\ta t org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:53)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172)\n\tat org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136)\n\t... 8 more\nCaused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '3' for key 'PRIMARY'\n\tat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)\n\tat com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)\n\ta t com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)\n\tat com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:716)\n\t... 16 more\n", "createTime": "2024-11-22 09:23:57", "finishTime": "2024-11-22 09:23:59", "jobDag": "{\"jobId\":912151384986484738,\"pipelineEdges\":{\"1\":[{\"inputVertexId\":1,\"targetVertexId\":2}]},\"vertexInfoMap\":{\"1\":{\"vertexId\":1,\"type\":\"SOURCE\",\"connectorType\":\"pipeline-1 [Source[0]-Jdbc]\"},\"2\":{\"vertexId\":2,\"type\":\"SINK\",\"connectorType\":\"pipeline-1 [Sink[0]-Jdbc-MultiTableSink]\"}}}", "pluginJarsUrls": [], "metrics": { "TableSourceReceivedCount": { "default.default.default": "13" }, "SourceReceivedCount": "13", "TableSinkWriteCount": { "default.default.default": "13" }, "SinkWriteCount": "13" } } ] rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is inaccurate, Always the same as SourceReceivedCount. ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version java-1.8.0_412 ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
