ricky2129 opened a new issue, #10568:
URL: https://github.com/apache/seatunnel/issues/10568

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   - SeaTunnel Version: 2.3.12                                                  
                                                                                
       
   - Deployment: Kubernetes (Helm), Zeta engine, separated cluster mode
   - IMap backend: S3 (EAGER loading mode)
   - Trigger: Kubernetes rolling update of master pods (workers stay up)
   
   During a master pod rolling restart, CDC jobs show **FAILED** in the UI and 
REST API while they are **still actively writing data** to the sink.
   
   Error thrown repeatedly on workers:
    org.apache.seatunnel.engine.common.exception.TaskGroupDeployException:
       java.lang.RuntimeException: TaskGroupLocation:
       TaskGroupLocation{jobId=************, pipelineId=1, taskGroupId=2} 
already exists
       at 
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336
   
   
   During a master rolling restart, workers keep their tasks running. The new 
master reads from S3 IMap that jobs are 'RUNNING' and calls 
'restoreAllRunningJobFromMasterNodeSwitch()'. This creates fresh 
'PhysicalVertex'
   objects whose *local* 'currExecutionState' is initialized to 'CREATED', 
while the S3 IMap already has 'RUNNING' for those tasks.
   
   'SubPlan.stateProcess()' DEPLOYING case checks 'task.getExecutionState()' 
which reads the *local* field (='CREATED'), not the IMap — so the check passes 
and 'makeTaskGroupDeploy()' is called. Inside, 'updateTaskState(DEPLOYING)' 
reads IMap (`RUNNING` is not an end state) and transitions to 'DEPLOYING'. Then 
'deploy()' sends 'DeployTaskOperation' to the worker.
   
   The worker's 'TaskExecutionService.deployTask()' finds the 
'TaskGroupLocation' already in 'executionContexts' (task IS running) and throws 
"already exists".
   
     This causes an infinite failure cascade:
   
     deploy fails → makeTaskGroupFailing → PhysicalVertex FAILED
     → addPhysicalVertexCallBack → updatePipelineState(FAILING)
     → stateProcess() FAILED → checkNeedRestore → restorePipeline()
     → deploy again → "already exists" → loop
   
   ## Steps to Reproduce
   
     1. Deploy SeaTunnel on Kubernetes with S3-backed IMap (EAGER mode)
     2. Start one or more CDC streaming jobs
     3. Trigger a master rolling update (`kubectl rollout restart 
deployment/<master>`)
     4. Observe: jobs transition to FAILED in UI, workers log "already exists" 
repeatedly
     5. Jobs are still writing to sink despite showing FAILED
   
   ## Expected Behavior
   
   Jobs remain RUNNING through master rolling restart. The new master 
reconnects to already-running tasks on workers without interrupting them.
   
   
   ## Proposed Fix
   
   Make `TaskExecutionService.deployTask()' idempotent for the master failover 
case. When a 'TaskGroupLocation' is found in 'executionContexts' (task is 
actively running — not in 'finishedExecutionContexts'), return success instead 
of throwing. The worker notifies the master of terminal state via 
'NotifyTaskStatusOperation' (using dynamic 'getMasterAddress()') when the task 
eventually completes, maintaining correct lifecycle tracking.
   
   Legitimate redeployment is unaffected: completed tasks are moved to 
'finishedExecutionContexts' by 'taskDone()', so 
'executionContexts.containsKey()' returns false and normal deploy proceeds.
   
   Fix submitted in PR #10567.
   
   ### SeaTunnel Version
   
   2.3.12
   
   ### SeaTunnel Config
   
   ```conf
   apiVersion: v1                                                               
                                                                                
         
     kind: ConfigMap                                                            
     metadata:                                                                  
                                                                                
           
       name: <release-name>-configs                                             
                                                                                
         
       namespace: seatunnel                                                     
                                                                                
           
       labels:                                                                  
                                                                                
         
         app.kubernetes.io/name: <release-name>-configs
         app.kubernetes.io/instance: <release-name>
     data:
       hazelcast-client.yaml: |
         hazelcast-client:
           cluster-name: <release-name>
           properties:
             hazelcast.logging.type: log4j2
           connection-strategy:
             connection-retry:
               cluster-connect-timeout-millis: 3000
           network:
             cluster-members:
               - <release-name>.seatunnel.svc.cluster.local:5801
   
       hazelcast-master.yaml: |
         hazelcast:
           cluster-name: <release-name>
           network:
             rest-api:
               enabled: true
               endpoint-groups:
                 CLUSTER_WRITE:
                   enabled: true
                 DATA:
                   enabled: true
             join:
               kubernetes:
                 enabled: true
                 service-dns: <release-name>.seatunnel.svc.cluster.local
                 service-port: 5801
             port:
               auto-increment: false
               port: 5801
           properties:
             hazelcast.invocation.max.retry.count: 50
             hazelcast.tcp.join.port.try.count: 30
             hazelcast.logging.type: log4j2
             hazelcast.operation.generic.thread.count: 50
             hazelcast.heartbeat.failuredetector.type: phi-accrual
             hazelcast.heartbeat.interval.seconds: 2
             hazelcast.max.no.heartbeat.seconds: 300
             hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
             hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
             hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 
100
             hazelcast.operation.call.timeout.millis: 180000
             hazelcast.connection.timeout.seconds: 60
           map:
             engine*:
               map-store:
                 enabled: true
                 initial-mode: EAGER
                 factory-class-name: 
org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
                 properties:
                   type: hdfs
                   namespace: /<release-name>/imap/
                   clusterName: <release-name>
                   storage.type: s3
                   s3.bucket: s3a://<your-s3-bucket>
                   fs.s3a.endpoint: s3.<your-region>.amazonaws.com
                   fs.s3a.aws.credentials.provider: 
com.amazonaws.auth.EnvironmentVariableCredentialsProvider
   
       hazelcast-worker.yaml: |
         hazelcast:
           cluster-name: <release-name>
           network:
             rest-api:
               enabled: true
               endpoint-groups:
                 CLUSTER_WRITE:
                   enabled: true
                 DATA:
                   enabled: true
             join:
               kubernetes:
                 enabled: true
                 service-dns: <release-name>.seatunnel.svc.cluster.local
                 service-port: 5801
             port:
               auto-increment: false
               port: 5801
           properties:
             hazelcast.invocation.max.retry.count: 50
             hazelcast.tcp.join.port.try.count: 30
             hazelcast.logging.type: log4j2
             hazelcast.operation.generic.thread.count: 50
             hazelcast.heartbeat.failuredetector.type: phi-accrual
             hazelcast.heartbeat.interval.seconds: 2
             hazelcast.max.no.heartbeat.seconds: 300
             hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
             hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
             hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 
100
             hazelcast.operation.call.timeout.millis: 180000
             hazelcast.connection.timeout.seconds: 60
           member-attributes:
             rule:
               type: string
               value: worker
   
       jvm_client_options: |
         -Xms256m
         -Xmx512m
         -XX:+HeapDumpOnOutOfMemoryError
         -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-client
   
       jvm_master_options: |
         -XX:+HeapDumpOnOutOfMemoryError
         -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
         -XX:MaxMetaspaceSize=2g
         -XX:+UseG1GC
   
       jvm_worker_options: |
         -XX:+HeapDumpOnOutOfMemoryError
         -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server
         -XX:MaxMetaspaceSize=2g
         -XX:+UseG1GC
   
       log4j2.properties: |
         monitorInterval = 60
         rootLogger.level = info
         rootLogger.appenderRef.console.ref = consoleAppender
         logger.engine.name = org.apache.seatunnel.engine
         logger.engine.level = info
         logger.debezium.name = io.debezium.connector
         logger.debezium.level = warn
         appender.console.type = Console
         appender.console.name = consoleAppender
         appender.console.target = SYSTEM_OUT
         appender.console.layout.type = PatternLayout
         appender.console.layout.pattern = %d{ISO8601} %-5p [%t] %c{2}: %m%n
   
       seatunnel.yaml: |
         seatunnel:
           engine:
             history-job-expire-minutes: 1440
             backup-count: 1
             queue-type: blockingqueue
             print-execution-info-interval: 60
             print-job-metrics-info-interval: 60
             classloader-cache-mode: true
             slot-service:
               dynamic-slot: true
               slot-allocation-strategy: SYSTEM_LOAD
             http:
               enable-http: true
               port: 8080
               enable-dynamic-port: false
               port-range: 100
             checkpoint:
               interval: 10000
               timeout: 60000
               storage:
                 type: hdfs
                 max-retained: 3
                 plugin-config:
                   namespace: /<release-name>/checkpoint/
                   storage.type: s3
                   s3.bucket: s3a://<your-s3-bucket>
                   fs.s3a.endpoint: s3.<your-region>.amazonaws.com
                   fs.s3a.aws.credentials.provider: 
com.amazonaws.auth.EnvironmentVariableCredentialsProvider
             telemetry:
               metric:
                 enabled: true
   ```
   
   ### Running Command
   
   ```shell
   kubernetes helm deployment
   ```
   
   ### Error Exception
   
   ```log
   org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: 
java.lang.RuntimeException: TaskGroupLocation:                                  
           
     TaskGroupLocation{jobId=******, pipelineId=1, taskGroupId=2} already 
exists at                                                                       
     
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336)
 at                                                              
     
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:267)
 at                                                              
     
org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55)
 at                                                 
     
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
 at                                                               
     com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
at                                                                              
       
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 at                                                            
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 at                                                             
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
 at                                                             
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
 at                                                                
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
 at                                                                
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 at                                                             
     
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 at                                                               
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:561)
 at                                                           
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:390)
 at                                                        
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:304)
 at                                                    
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$21(SubPlan.java:673)
 at java.util.ArrayList.forEach(ArrayList.java:1259) at           
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:669)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:655)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608)
 at                                                             
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496)
 at                                                                      
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:699)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228)
 at                                                   
     
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at 
                                                                                
  
     
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
 at                                                                           
     
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at    
 
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at                                                                              
  
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) that null pointer exception we fixed 
but 
      jobs got restarted 
org.apache.seatunnel.engine.common.exception.TaskGroupDeployException: 
java.lang.RuntimeException: TaskGroupLocation:                         
     TaskGroupLocation{jobId=1079676837098946584, pipelineId=1, taskGroupId=2} 
already exists at                                                               
        
     
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:336)
 at                                                              
     
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:267)
 at                                                              
     
org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation.runInternal(DeployTaskOperation.java:55)
 at                                                 
     
org.apache.seatunnel.engine.server.task.operation.TracingOperation.run(TracingOperation.java:42)
 at                                                               
     com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189) 
at                                                                              
       
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
 at                                                            
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
 at                                                             
     
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
 at                                                             
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
 at                                                                
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
 at                                                                
     
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
 at                                                             
     
com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
 at                                                               
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:561)
 at                                                           
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:390)
 at                                                        
     
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:304)
 at                                                    
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$21(SubPlan.java:673)
 at java.util.ArrayList.forEach(ArrayList.java:1259) at           
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:669)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:655)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:624)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:608)
 at                                                             
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.restorePipeline(SubPlan.java:496)
 at                                                                      
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:699)
 at                                                                         
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
 at                                                                  
     
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$addPhysicalVertexCallBack$4(SubPlan.java:228)
 at                                                   
     
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) at 
                                                                                
  
     
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
 at                                                                           
     
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
 at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at    
 
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at                                                                              
  
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750)
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to