houjiakun opened a new issue, #8101:
URL: https://github.com/apache/seatunnel/issues/8101

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
    rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is 
inaccurate, Always the same as SourceReceivedCount.
       During testing, some duplicate data was created, The source table has 13 
data entries, of which 1 is duplicated.
       The target table ID serves as the primary key. After executing the 
seatunnel task, 12 data entries were written to the target table, and the other 
1 primary key was duplicated, resulting in a write failure.
       But the SinkWriteCount returned by the API interface above is 13. How 
should I set it to ensure that the SinkWriteCount is correct.
   
   
   ### SeaTunnel Version
   
   2.3.7
   
   ### SeaTunnel Config
   
   ```conf
   env {
       job.mode = "BATCH"
       job.name = "data_label"
       job.retry.times=0
       parallelism = 1
   }
   
   source {
       Jdbc {
           url= 
"jdbc:mysql://xxx:3306/test?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false"
           driver= "com.mysql.jdbc.Driver"
           user = "root"
           password = "xxx"
           result_table_name= "data_label_src"
           query= "SELECT id as id, label_name AS label_name,order_no as 
order_no,is_del AS is_del,crt_time AS crt_time,crt_user AS crt_user,crt_name AS 
crt_name,crt_host AS crt_host,upd_time AS upd_time,upd_user AS 
upd_user,upd_name AS upd_name,upd_host AS upd_host,description AS description 
FROM data_label"
       }
   }
   
   sink {
           Jdbc {
               url= 
"jdbc:mysql://xxxx:3306/test1?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useUnicode=true&useSSL=false"
               driver= "com.mysql.jdbc.Driver"
               user = "root"
                password = "xxxx"
               source_table_name =  "data_label_src"
               database = "test1"
               schema_save_mode = "ERROR_WHEN_SCHEMA_NOT_EXIST"
               max_retries=0
               table =  "data_label"
               generate_sink_sql = true
           }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config test.config -m cluster
   ```
   
   
   ### Error Exception
   
   ```log
   http://xxx:5801/hazelcast/rest/maps/finished-jobs return message:
   [
   {
           "jobId": "912151384986484738",
           "jobName": "data_label",
           "jobStatus": "FAILED",
           "errorMsg": "java.lang.RuntimeException: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed]\n\tat 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:292)\n\tat
 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:74)\n\tat
 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)\n\tat
 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)\n\tat
 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70)\n\tat
 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java
 :50)\n\tat 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)\n\tat
 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)\n\tat
 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)\n\tat
 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)\n\tat
 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)\n\tat
 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java
 .lang.Thread.run(Thread.java:750)\nCaused by: java.lang.RuntimeException: 
java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed]\n\tat 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234)\n\tat
 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:208)\n\t...
 16 more\nCaused by: java.util.concurrent.ExecutionException: 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed]\n\tat 
java.util.concurrent.FutureTask.report(FutureTask.java:122)\n\tat 
java.util.concurrent.FutureTask.get(FutureTask.java:192)\n\tat 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:23
 2)\n\t... 17 more\nCaused by: 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
 ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink 
connector failed]\n\tat 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:142)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCurrentTx(JdbcExactlyOnceSinkWriter.java:202)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcExactlyOnceSinkWriter.prepareCommit(JdbcExactlyOnceSinkWriter.java:144)\n\tat
 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217)\n\t...
 5 more\nCaused by: java.sql.BatchUpdateException: Duplicate entry '3' for key 
'PRIMARY'\n\tat sun.reflect.GeneratedConstructorAccessor55.newInstance(Unknown 
Source)\n\tat 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\n\tat
  java.lang.reflect.Constructor.newInstance(Constructor.java:423)\n\tat 
com.mysql.cj.util.Util.handleNewInstance(Util.java:192)\n\tat 
com.mysql.cj.util.Util.getInstance(Util.java:167)\n\tat 
com.mysql.cj.util.Util.getInstance(Util.java:174)\n\tat 
com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)\n\tat
 com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:800)\n\tat 
com.mysql.cj.jdbc.StatementWrapper.executeBatch(StatementWrapper.java:545)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.executeBatch(FieldNamedPreparedStatement.java:534)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:51)\n\ta
 t 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:53)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:172)\n\tat
 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:136)\n\t...
 8 more\nCaused by: java.sql.SQLIntegrityConstraintViolationException: 
Duplicate entry '3' for key 'PRIMARY'\n\tat 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)\n\tat
 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1098)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1046)\n\ta
 t 
com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1371)\n\tat
 
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:716)\n\t...
 16 more\n",
           "createTime": "2024-11-22 09:23:57",
           "finishTime": "2024-11-22 09:23:59",
           "jobDag": 
"{\"jobId\":912151384986484738,\"pipelineEdges\":{\"1\":[{\"inputVertexId\":1,\"targetVertexId\":2}]},\"vertexInfoMap\":{\"1\":{\"vertexId\":1,\"type\":\"SOURCE\",\"connectorType\":\"pipeline-1
 
[Source[0]-Jdbc]\"},\"2\":{\"vertexId\":2,\"type\":\"SINK\",\"connectorType\":\"pipeline-1
 [Sink[0]-Jdbc-MultiTableSink]\"}}}",
           "pluginJarsUrls": [],
           "metrics": {
               "TableSourceReceivedCount": {
                   "default.default.default": "13"
               },
               "SourceReceivedCount": "13",
               "TableSinkWriteCount": {
                   "default.default.default": "13"
               },
               "SinkWriteCount": "13"
           }
       }
   ]
   rest api:/hazelcast/rest/maps/finished-jobs,The SinkWriteCount returned is 
inaccurate, Always the same as SourceReceivedCount.
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   java-1.8.0_412
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to