yengkhoo opened a new issue, #57618:
URL: https://github.com/apache/airflow/issues/57618

   ### Apache Airflow version
   
   Other Airflow 2/3 version (please specify below)
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   3.0.4
   
   ### What happened?
   
   I'm upgrading Airflow 2.11.0 to Airflow **3.0.4**.
   Using helm chart v1.18.0 to deploy on EKS, and the backend DB is AWS RDS 
Postgres (`db.t4g.large`, `engine 15.12`). I am using a custom image but just 
installing extra libraries.
   
   In Airflow 3, when Scheduler replica is set to 2, sometimes a task will get 
its TaskInstance created twice with different try_number (each scheduler 
created one) before the first one started running. Consequently, 2 worker pods 
are created and one of them will fail with invalid_state error below.
   
   I also tested this with versions **3.0.6** and **3.1.1**. I've checked that 
the resources for apiservers, schedulers and database usage is still far from 
limit. 
   
   Extracted logs from Schedulers for a task (newest log on top):
   
[schedulers-log.csv](https://github.com/user-attachments/files/23261576/schedulers-log.csv)
   
   Example error log from failed pod:
   >  
{"timestamp":"2025-10-25T02:49:26.475102Z","level":"info","event":"Executing 
workload","workload":"ExecuteTask(token='aaabbbbcccc', 
ti=TaskInstance(id=UUID('01994b35-8445-7dd3-bcd8-ecc39e0e448b'), 
task_id='spam_7.task_2', dag_id='spam_tasks_multiple_groups_dag', 
run_id='scheduled__2025-10-25T02:30:00+00:00', try_number=1, map_index=-1, 
pool_slots=1, queue='default', priority_weight=6, executor_config=None, 
parent_context_carrier={}, context_carrier={}, queued_dttm=None), 
dag_rel_path=PurePosixPath(''yeng-test/spam-dags.py'), 
bundle_info=BundleInfo(name='airflow-pipes', version='xx'), 
log_path='dag_id=spam_tasks_multiple_groups_dag/run_id=manual__2025-10-31T12:08:41+00:00/task_id=spam_7.task_2/attempt=1.log',
 type='ExecuteTask')","logger":"__main__"}
   
{"timestamp":"2025-10-25T02:49:27.001212Z","level":"info","event":"Connecting 
to 
server:","server":"http://airflow-api-server:8080/execution/","logger":"__main__"}
   {"timestamp":"2025-10-25T02:49:27.068125Z","level":"info","event":"Secrets 
backends loaded for 
worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"}
   {"timestamp":"2025-10-25T02:49:27.095790Z","level":"warning","event":"Server 
error","detail":{"detail":{"reason":"invalid_state","message":"TI was not in a 
state where it could be marked as 
running","previous_state":"success"}},"logger":"airflow.sdk.api.client"}
   {"timestamp":"2025-10-25T02:49:27.140185Z","level":"info","event":"Process 
exited","pid":14,"exit_code":-9,"signal_sent":"SIGKILL","logger":"supervisor"}
   Traceback (most recent call last):
     File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
       return _run_code(code, main_globals, None,
     File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
       exec(code, run_globals)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 125, in <module>
       main()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 121, in main
       execute_workload(workload)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py",
 line 66, in execute_workload
       supervise(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 1829, in supervise
       process = ActivitySubprocess.start(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 933, in start
       proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, 
bundle_info=bundle_info)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 944, in _on_child_started
       ti_context = self.client.task_instances.start(ti.id, self.pid, 
start_date)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 152, in start
       resp = self.client.patch(f"task-instances/{id}/run", 
content=body.model_dump_json())
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 1218, in patch
       return self.request(
     File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
338, in wrapped_f
       return copy(f, *args, **kw)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
477, in __call__
       do = self.iter(retry_state=retry_state)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
378, in iter
       result = action(retry_state)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
400, in <lambda>
       self._add_action_func(lambda rs: rs.outcome.result())
     File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 
480, in __call__
       result = fn(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 735, in request
       return super().request(*args, **kwargs)
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 825, in request
       return self.send(request, auth=auth, follow_redirects=follow_redirects)
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 914, in send
       response = self._send_handling_auth(
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 942, in _send_handling_auth
       response = self._send_handling_redirects(
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 999, in _send_handling_redirects
       raise exc
     File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", 
line 982, in _send_handling_redirects
       hook(response)
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 117, in raise_on_4xx_5xx
       return get_json_error(response) or response.raise_for_status()
     File 
"/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", 
line 113, in get_json_error
       raise err
   airflow.sdk.api.client.ServerResponseError: Server returned error
   
   
   ### What you think should happen instead?
   
   When Scheduler replica is more than 1, a task's retry TI should only be 
created after the previous TI failed.
   
   ### How to reproduce
   
   values:
   
   ```
   revisionHistoryLimit: 1
   airflowVersion: 3.0.4
   images:
     airflow:
       repository: my-custom-image
       tag: dev
       pullPolicy: IfNotPresent
     useDefaultImageForMigration: false
     migrationsWaitTimeout: 60
   labels: ~
   networkPolicies:
     enabled: false
   securityContexts:
     pod:
       runAsUser: 50000
       runAsGroup: 0
       fsGroup: 50000
       fsGroupChangePolicy: "OnRootMismatch"
       runAsNonRoot: true
     containers:
       allowPrivilegeEscalation: false
       capabilities:
         drop:
           - ALL
       readOnlyRootFilesystem: true
   # empty directory required to set readOnlyRootFilesystem
   volumes:
     - name: tmp
       emptyDir: {}
     - name: opt
       emptyDir: {}
   volumeMounts:
     - name: tmp
       mountPath: /tmp
     - name: opt
       mountPath: /opt/airflow
   fernetKeySecretName: airflow-fernet-secret
   env:
     - name: OPENLINEAGE_DISABLED
       value: "true"
   config:
     core:
       auth_manager: 
airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
       dagbag_import_timeout: 200
       parallelism: 200
     dag_processor:
       dag_file_processor_timeout: 300
       dag_bundle_config_list: 
"[{\"name\":\"airflow-dags\",\"classpath\":\"airflow.providers.git.bundles.git.GitDagBundle\",\"kwargs\":{\"subdir\":\"dags\",\"tracking_ref\":\"yeng-test-v3\",\"git_conn_id\":\"airflow-dags-dagbundle\",\"refresh_interval\":900}}]"
     database:
       sql_alchemy_pool_enabled: 'False'
       sql_alchemy_max_overflow: 20
       sql_alchemy_pool_size: 10
     kubernetes_executor:
       delete_worker_pods: true
     scheduler:
       parsing_processes: 4
     logging:
       remote_logging: 'True'
       remote_base_log_folder: s3://airflow-logs-test/
       # Connection name created to connect to S3.
       remote_log_conn_id: aws_s3_logging
       encrypt_s3_logs: False
     api:
       expose_config: 'True'
       base_url: 'https://airflow-sbx.mydomain.net'
   ingress:
     apiServer:
       enabled: true
       annotations: ...
       hosts:
         - name: airflow-sbx.mydomain.net
       ingressClassName: alb
       pathType: "Prefix"
   apiSecretKeySecretName: airflow-apiserver-secret
   jwtSecretName: airflow-api-jwt-secret
   webserver:
     defaultUser:
       enabled: false
   apiServer:
     replicas: 2
     resources:
       limits:
         cpu: 3
         memory: 5Gi
       requests:
         cpu: 3
         memory: 5Gi
     topologySpreadConstraints:
       - maxSkew: 1
         topologyKey: "topology.kubernetes.io/zone"
         whenUnsatisfiable: ScheduleAnyway
         labelSelector:
           matchLabels:
             component: apiServer
       - maxSkew: 1
         topologyKey: "kubernetes.io/hostname"
         whenUnsatisfiable: ScheduleAnyway
         labelSelector:
           matchLabels:
             component: apiServer
     apiServerConfig: ~ # i am using okta
     serviceAccount:
       annotations:
         eks.amazonaws.com/role-arn: 
arn:aws:iam::xxx:role/kubernetes/AirflowRole
   scheduler:
     replicas: 2
     resources:
       limits:
         cpu: 1.5
         memory: 2.5Gi
       requests:
         cpu: 1.5
         memory: 2.5Gi
     livenessProbe:
       initialDelaySeconds: 90
       periodSeconds: 50
       timeoutSeconds: 50
       failureThreshold: 20
     topologySpreadConstraints:
       - maxSkew: 1
         topologyKey: "topology.kubernetes.io/zone"
         whenUnsatisfiable: ScheduleAnyway
         labelSelector:
           matchLabels:
             component: scheduler
       - maxSkew: 1
         topologyKey: "kubernetes.io/hostname"
         whenUnsatisfiable: ScheduleAnyway
         labelSelector:
           matchLabels:
             component: scheduler
     serviceAccount:
       annotations:
         eks.amazonaws.com/role-arn: 
arn:aws:iam::xxx:role/kubernetes/AirflowRole
   workers:
     resources:
       limits:
         cpu: 1000m
         memory: 1024Mi
       requests:
         cpu: 1000m
         memory: 1024Mi
     logGroomerSidecar:
       resources:
         limits:
           cpu: 1
           memory: 1Gi
         requests:
           cpu: 1
           memory: 1Gi
     securityContexts:
       pod:
         runAsUser: 50000
         runAsGroup: 0
         fsGroup: 50000
         fsGroupChangePolicy: "OnRootMismatch"
         runAsNonRoot: true
       container:
         allowPrivilegeEscalation: false
         capabilities:
           drop:
             - ALL
         readOnlyRootFilesystem: false
     serviceAccount:
       annotations:
         eks.amazonaws.com/role-arn: 
arn:aws:iam::xxx:role/kubernetes/AirflowRole
   podTemplate: ~
   dagProcessor:
     enabled: true
     replicas: 1
   executor: "KubernetesExecutor"
   allowPodLaunching: true
   dags:
     persistence:
       enabled: false
   logs:
     persistence:
       enabled: false
   redis:
     enabled: false
   postgresql:
     enabled: false
   createUserJob:
     useHelmHooks: false
     applyCustomEnv: false
   migrateDatabaseJob:
     enabled: true
     useHelmHooks: false
     applyCustomEnv: false
     jobAnnotations:
       "argocd.argoproj.io/hook": Sync
   data:
     metadataSecretName: airflow-pgbouncer-secret
     enabled: true
     replicas: 1
     revisionHistoryLimit: 1
     sslmode: "disable"
     ciphers: "normal"
     auth_type: md5
     command: ["pgbouncer", "-u", "nobody", 
"/etc/pgbouncer-config/pgbouncer.ini"]
     configSecretName: airflow-pgbouncer-ini-custom-secret 
     extraVolumes: ...
     resources:
       limits:
         cpu: 1
         memory: 1Gi
       requests:
         cpu: 1
         memory: 1Gi
   ```
   
   
   
   DAG that can reproduce the issue . After triggering, tasks will get 
scheduled and run as usual, but after a while failed worker pods will start 
showing.
   
   ```
   from airflow.sdk import DAG
   from airflow.operators.python import PythonOperator
   from airflow.utils.task_group import TaskGroup
   from datetime import datetime, timedelta
   import time
   
   
   # Task function
   def sleep_task(task_id):
       print(f"Starting {task_id} ...")
       time.sleep(30)
       print(f"Finished {task_id}.")
   
   
   # Default arguments
   default_args = {
       "owner": "airflow",
       "depends_on_past": False,
       "retries": 4,
   }
   
   with DAG(
       dag_id="spam_tasks_multiple_groups_dag",
       default_args=default_args,
       description="DAG with TaskGroups, each running multiple parallel tasks",
       schedule=None, 
       start_date=datetime(2025, 1, 1),
       catchup=False,
       max_active_runs=1,
       tags=["test", "parallel", "taskgroup", "load"],
   ) as dag:
   
       start = PythonOperator(
           task_id="start",
           python_callable=lambda: print("Starting DAG run"),
       )
   
       # Create task groups dynamically
       task_groups = []
       for g in range(1, 10):  
           with TaskGroup(group_id=f"spam_{g}") as tg:
               for i in range(1, 6): 
                   PythonOperator(
                       task_id=f"task_{i}",
                       python_callable=sleep_task,
                       op_args=[f"grp{g}_task{i}"],
                   )
           task_groups.append(tg)
   
       end = PythonOperator(
           task_id="end",
           python_callable=lambda: print("All task groups completed."),
       )
   
       # Define DAG structure: start → all groups (in parallel) → end
       start >> task_groups >> end
   
   ```
   
   pip freeze | grep apache-airflow-providers
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.15.0
   apache-airflow-providers-celery==3.12.4
   apache-airflow-providers-cncf-kubernetes==10.8.2
   apache-airflow-providers-fab==3.0.0
   apache-airflow-providers-postgres==6.3.0
   apache-airflow-providers-standard==1.9.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to