exitNA opened a new issue, #8065: URL: https://github.com/apache/seatunnel/issues/8065
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 使用zeta引擎分离模式部署一个master,一个worker节点,使用mysqlcdc同步数据到doris中,运行一段时间后查看任务状态显示FAILED。于是手工恢复任务,但是一会任务又显示失败,但是查看doris发现仍有数据写入,查看失败任务的日志,发现任务日志仍在在更新。 ### SeaTunnel Version 2.3.8 ### SeaTunnel Config ```conf env { job.name = "stream-test" job.mode = "STREAMING" checkpoint.interval = 60000 checkpoint.timeout = 60000 parallelism = 4 read_limit.rows_per_second=100 } source { MySQL-CDC { base-url = "jdbc:mysql://127.0.0.1:3306/testdb" username = "test_user" password = "test_pwd" snapshot.fetch.size = 200 connection.pool.size = 4 startup.mode = "initial" database-names = [ "testdb" ] table-names = [ "testdb.t1", "testdb.t2", "testdb.t3", "testdb.t4" ] debezium { include.schema.changes = true } } } sink { Doris { fenodes = "127.0.0.1:8030" query-port = 9030 username = "test_user" password = "test_pwd" database = "${database_name}" table = "${table_name}" sink.enable-delete = "true" doris.config { format = "json" read_json_by_line = "true" } schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" save_mode_create_template = """ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( ${rowtype_primary_key}, ${rowtype_fields}, sync_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" ) """ } } ``` ### Running Command ```shell ./bin/seatunnel.sh -c job/stream-test.config -r 909634676551843842 --async ``` ### Error Exception ```log org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: java.lang.RuntimeException: TaskGroupLocation: TaskGroupLocation{jobId=909634676551843842, pipelineId=1, taskGroupId=1} already exists at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:337) at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:268) at org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:44) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:574) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:403) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:317) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$19(SubPlan.java:643) at java.util.ArrayList.forEach(ArrayList.java:1249) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:639) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:633) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:676) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Zeta or Flink or Spark Version zeta: 2.3.8 ### Java or Scala Version jdk:1.8.0_111 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
