ricky2129 opened a new issue, #10568: URL: https://github.com/apache/seatunnel/issues/10568
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened - SeaTunnel Version: 2.3.12 - Deployment: Kubernetes (Helm), Zeta engine, separated cluster mode - IMap backend: S3 (EAGER loading mode) - Trigger: Kubernetes rolling update of master pods (workers stay up) During a master pod rolling restart, CDC jobs show **FAILED** in the UI and REST API while they are **still actively writing data** to the sink. Error thrown repeatedly on workers: org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: java.lang.RuntimeException: TaskGroupLocation: TaskGroupLocation{jobId=************, pipelineId=1, taskGroupId=2} already exists at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336 During a master rolling restart, workers keep their tasks running. The new master reads from S3 IMap that jobs are 'RUNNING' and calls 'restoreAllRunningJobFromMasterNodeSwitch()'. This creates fresh 'PhysicalVertex' objects whose *local* 'currExecutionState' is initialized to 'CREATED', while the S3 IMap already has 'RUNNING' for those tasks. 'SubPlan.stateProcess()' DEPLOYING case checks 'task.getExecutionState()' which reads the *local* field (='CREATED'), not the IMap — so the check passes and 'makeTaskGroupDeploy()' is called. Inside, 'updateTaskState(DEPLOYING)' reads IMap (`RUNNING` is not an end state) and transitions to 'DEPLOYING'. Then 'deploy()' sends 'DeployTaskOperation' to the worker. The worker's 'TaskExecutionService.deployTask()' finds the 'TaskGroupLocation' already in 'executionContexts' (task IS running) and throws "already exists". This causes an infinite failure cascade: deploy fails → makeTaskGroupFailing → PhysicalVertex FAILED → addPhysicalVertexCallBack → updatePipelineState(FAILING) → stateProcess() FAILED → checkNeedRestore → restorePipeline() → deploy again → "already exists" → loop ## Steps to Reproduce 1. Deploy SeaTunnel on Kubernetes with S3-backed IMap (EAGER mode) 2. Start one or more CDC streaming jobs 3. Trigger a master rolling update (`kubectl rollout restart deployment/<master>`) 4. Observe: jobs transition to FAILED in UI, workers log "already exists" repeatedly 5. Jobs are still writing to sink despite showing FAILED ## Expected Behavior Jobs remain RUNNING through master rolling restart. The new master reconnects to already-running tasks on workers without interrupting them. ## Proposed Fix Make `TaskExecutionService.deployTask()' idempotent for the master failover case. When a 'TaskGroupLocation' is found in 'executionContexts' (task is actively running — not in 'finishedExecutionContexts'), return success instead of throwing. The worker notifies the master of terminal state via 'NotifyTaskStatusOperation' (using dynamic 'getMasterAddress()') when the task eventually completes, maintaining correct lifecycle tracking. Legitimate redeployment is unaffected: completed tasks are moved to 'finishedExecutionContexts' by 'taskDone()', so 'executionContexts.containsKey()' returns false and normal deploy proceeds. Fix submitted in PR #10567. ### SeaTunnel Version 2.3.12 ### SeaTunnel Config ```conf apiVersion: v1 kind: ConfigMap metadata: name: <release-name>-configs namespace: seatunnel labels: app.kubernetes.io/name: <release-name>-configs app.kubernetes.io/instance: <release-name> data: hazelcast-client.yaml: | hazelcast-client: cluster-name: <release-name> properties: hazelcast.logging.type: log4j2 connection-strategy: connection-retry: cluster-connect-timeout-millis: 3000 network: cluster-members: - <release-name>.seatunnel.svc.cluster.local:5801 hazelcast-master.yaml: | hazelcast: cluster-name: <release-name> network: rest-api: enabled: true endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: kubernetes: enabled: true service-dns: <release-name>.seatunnel.svc.cluster.local service-port: 5801 port: auto-increment: false port: 5801 properties: hazelcast.invocation.max.retry.count: 50 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 hazelcast.operation.generic.thread.count: 50 hazelcast.heartbeat.failuredetector.type: phi-accrual hazelcast.heartbeat.interval.seconds: 2 hazelcast.max.no.heartbeat.seconds: 300 hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 hazelcast.operation.call.timeout.millis: 180000 hazelcast.connection.timeout.seconds: 60 map: engine*: map-store: enabled: true initial-mode: EAGER factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory properties: type: hdfs namespace: /<release-name>/imap/ clusterName: <release-name> storage.type: s3 s3.bucket: s3a://<your-s3-bucket> fs.s3a.endpoint: s3.<your-region>.amazonaws.com fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider hazelcast-worker.yaml: | hazelcast: cluster-name: <release-name> network: rest-api: enabled: true endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: kubernetes: enabled: true service-dns: <release-name>.seatunnel.svc.cluster.local service-port: 5801 port: auto-increment: false port: 5801 properties: hazelcast.invocation.max.retry.count: 50 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 hazelcast.operation.generic.thread.count: 50 hazelcast.heartbeat.failuredetector.type: phi-accrual hazelcast.heartbeat.interval.seconds: 2 hazelcast.max.no.heartbeat.seconds: 300 hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 hazelcast.operation.call.timeout.millis: 180000 hazelcast.connection.timeout.seconds: 60 member-attributes: rule: type: string value: worker jvm_client_options: | -Xms256m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client jvm_master_options: | -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server -XX:MaxMetaspaceSize=2g -XX:+UseG1GC jvm_worker_options: | -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server -XX:MaxMetaspaceSize=2g -XX:+UseG1GC log4j2.properties: | monitorInterval = 60 rootLogger.level = info rootLogger.appenderRef.console.ref = consoleAppender logger.engine.name = org.apache.seatunnel.engine logger.engine.level = info logger.debezium.name = io.debezium.connector logger.debezium.level = warn appender.console.type = Console appender.console.name = consoleAppender appender.console.target = SYSTEM_OUT appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{ISO8601} %-5p [%t] %c{2}: %m%n seatunnel.yaml: | seatunnel: engine: history-job-expire-minutes: 1440 backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 60 classloader-cache-mode: true slot-service: dynamic-slot: true slot-allocation-strategy: SYSTEM_LOAD http: enable-http: true port: 8080 enable-dynamic-port: false port-range: 100 checkpoint: interval: 10000 timeout: 60000 storage: type: hdfs max-retained: 3 plugin-config: namespace: /<release-name>/checkpoint/ storage.type: s3 s3.bucket: s3a://<your-s3-bucket> fs.s3a.endpoint: s3.<your-region>.amazonaws.com fs.s3a.aws.credentials.provider: com.amazonaws.auth.EnvironmentVariableCredentialsProvider telemetry: metric: enabled: true ``` ### Running Command ```shell kubernetes helm deployment ``` ### Error Exception ```log org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: java.lang.RuntimeException: TaskGroupLocation: TaskGroupLocation{jobId=******, pipelineId=1, taskGroupId=2} already exists at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336) at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:267) at org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:561) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:390) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:304) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$21(SubPlan.java:673) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:669) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:655) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:699) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) that null pointer exception we fixed but jobs got restarted org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: java.lang.RuntimeException: TaskGroupLocation: TaskGroupLocation{jobId=1079676837098946584, pipelineId=1, taskGroupId=2} already exists at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336) at org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:267) at org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55) at org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42) at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248) at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137) at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:561) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:390) at org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:304) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$21(SubPlan.java:673) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:669) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:655) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:699) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383) at org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
