yengkhoo opened a new issue, #57618: URL: https://github.com/apache/airflow/issues/57618
### Apache Airflow version Other Airflow 2/3 version (please specify below) ### If "Other Airflow 2/3 version" selected, which one? 3.0.4 ### What happened? I'm upgrading Airflow 2.11.0 to Airflow **3.0.4**. Using helm chart v1.18.0 to deploy on EKS, and the backend DB is AWS RDS Postgres (`db.t4g.large`, `engine 15.12`). I am using a custom image but just installing extra libraries. In Airflow 3, when Scheduler replica is set to 2, sometimes a task will get its TaskInstance created twice with different try_number (each scheduler created one) before the first one started running. Consequently, 2 worker pods are created and one of them will fail with invalid_state error below. I also tested this with versions **3.0.6** and **3.1.1**. I've checked that the resources for apiservers, schedulers and database usage is still far from limit. Extracted logs from Schedulers for a task (newest log on top): [schedulers-log.csv](https://github.com/user-attachments/files/23261576/schedulers-log.csv) Example error log from failed pod: > {"timestamp":"2025-10-25T02:49:26.475102Z","level":"info","event":"Executing workload","workload":"ExecuteTask(token='aaabbbbcccc', ti=TaskInstance(id=UUID('01994b35-8445-7dd3-bcd8-ecc39e0e448b'), task_id='spam_7.task_2', dag_id='spam_tasks_multiple_groups_dag', run_id='scheduled__2025-10-25T02:30:00+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=6, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=None), dag_rel_path=PurePosixPath(''yeng-test/spam-dags.py'), bundle_info=BundleInfo(name='airflow-pipes', version='xx'), log_path='dag_id=spam_tasks_multiple_groups_dag/run_id=manual__2025-10-31T12:08:41+00:00/task_id=spam_7.task_2/attempt=1.log', type='ExecuteTask')","logger":"__main__"} {"timestamp":"2025-10-25T02:49:27.001212Z","level":"info","event":"Connecting to server:","server":"http://airflow-api-server:8080/execution/","logger":"__main__"} {"timestamp":"2025-10-25T02:49:27.068125Z","level":"info","event":"Secrets backends loaded for worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"} {"timestamp":"2025-10-25T02:49:27.095790Z","level":"warning","event":"Server error","detail":{"detail":{"reason":"invalid_state","message":"TI was not in a state where it could be marked as running","previous_state":"success"}},"logger":"airflow.sdk.api.client"} {"timestamp":"2025-10-25T02:49:27.140185Z","level":"info","event":"Process exited","pid":14,"exit_code":-9,"signal_sent":"SIGKILL","logger":"supervisor"} Traceback (most recent call last): File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main return _run_code(code, main_globals, None, File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code exec(code, run_globals) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 125, in <module> main() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 121, in main execute_workload(workload) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 66, in execute_workload supervise( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 1829, in supervise process = ActivitySubprocess.start( File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 933, in start proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 944, in _on_child_started ti_context = self.client.task_instances.start(ti.id, self.pid, start_date) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 152, in start resp = self.client.patch(f"task-instances/{id}/run", content=body.model_dump_json()) File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 1218, in patch return self.request( File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 338, in wrapped_f return copy(f, *args, **kw) File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 477, in __call__ do = self.iter(retry_state=retry_state) File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 378, in iter result = action(retry_state) File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 400, in <lambda> self._add_action_func(lambda rs: rs.outcome.result()) File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result return self.__get_result() File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 480, in __call__ result = fn(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 735, in request return super().request(*args, **kwargs) File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 825, in request return self.send(request, auth=auth, follow_redirects=follow_redirects) File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 914, in send response = self._send_handling_auth( File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 942, in _send_handling_auth response = self._send_handling_redirects( File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 999, in _send_handling_redirects raise exc File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 982, in _send_handling_redirects hook(response) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 117, in raise_on_4xx_5xx return get_json_error(response) or response.raise_for_status() File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 113, in get_json_error raise err airflow.sdk.api.client.ServerResponseError: Server returned error ### What you think should happen instead? When Scheduler replica is more than 1, a task's retry TI should only be created after the previous TI failed. ### How to reproduce values: ``` revisionHistoryLimit: 1 airflowVersion: 3.0.4 images: airflow: repository: my-custom-image tag: dev pullPolicy: IfNotPresent useDefaultImageForMigration: false migrationsWaitTimeout: 60 labels: ~ networkPolicies: enabled: false securityContexts: pod: runAsUser: 50000 runAsGroup: 0 fsGroup: 50000 fsGroupChangePolicy: "OnRootMismatch" runAsNonRoot: true containers: allowPrivilegeEscalation: false capabilities: drop: - ALL readOnlyRootFilesystem: true # empty directory required to set readOnlyRootFilesystem volumes: - name: tmp emptyDir: {} - name: opt emptyDir: {} volumeMounts: - name: tmp mountPath: /tmp - name: opt mountPath: /opt/airflow fernetKeySecretName: airflow-fernet-secret env: - name: OPENLINEAGE_DISABLED value: "true" config: core: auth_manager: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager dagbag_import_timeout: 200 parallelism: 200 dag_processor: dag_file_processor_timeout: 300 dag_bundle_config_list: "[{\"name\":\"airflow-dags\",\"classpath\":\"airflow.providers.git.bundles.git.GitDagBundle\",\"kwargs\":{\"subdir\":\"dags\",\"tracking_ref\":\"yeng-test-v3\",\"git_conn_id\":\"airflow-dags-dagbundle\",\"refresh_interval\":900}}]" database: sql_alchemy_pool_enabled: 'False' sql_alchemy_max_overflow: 20 sql_alchemy_pool_size: 10 kubernetes_executor: delete_worker_pods: true scheduler: parsing_processes: 4 logging: remote_logging: 'True' remote_base_log_folder: s3://airflow-logs-test/ # Connection name created to connect to S3. remote_log_conn_id: aws_s3_logging encrypt_s3_logs: False api: expose_config: 'True' base_url: 'https://airflow-sbx.mydomain.net' ingress: apiServer: enabled: true annotations: ... hosts: - name: airflow-sbx.mydomain.net ingressClassName: alb pathType: "Prefix" apiSecretKeySecretName: airflow-apiserver-secret jwtSecretName: airflow-api-jwt-secret webserver: defaultUser: enabled: false apiServer: replicas: 2 resources: limits: cpu: 3 memory: 5Gi requests: cpu: 3 memory: 5Gi topologySpreadConstraints: - maxSkew: 1 topologyKey: "topology.kubernetes.io/zone" whenUnsatisfiable: ScheduleAnyway labelSelector: matchLabels: component: apiServer - maxSkew: 1 topologyKey: "kubernetes.io/hostname" whenUnsatisfiable: ScheduleAnyway labelSelector: matchLabels: component: apiServer apiServerConfig: ~ # i am using okta serviceAccount: annotations: eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole scheduler: replicas: 2 resources: limits: cpu: 1.5 memory: 2.5Gi requests: cpu: 1.5 memory: 2.5Gi livenessProbe: initialDelaySeconds: 90 periodSeconds: 50 timeoutSeconds: 50 failureThreshold: 20 topologySpreadConstraints: - maxSkew: 1 topologyKey: "topology.kubernetes.io/zone" whenUnsatisfiable: ScheduleAnyway labelSelector: matchLabels: component: scheduler - maxSkew: 1 topologyKey: "kubernetes.io/hostname" whenUnsatisfiable: ScheduleAnyway labelSelector: matchLabels: component: scheduler serviceAccount: annotations: eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole workers: resources: limits: cpu: 1000m memory: 1024Mi requests: cpu: 1000m memory: 1024Mi logGroomerSidecar: resources: limits: cpu: 1 memory: 1Gi requests: cpu: 1 memory: 1Gi securityContexts: pod: runAsUser: 50000 runAsGroup: 0 fsGroup: 50000 fsGroupChangePolicy: "OnRootMismatch" runAsNonRoot: true container: allowPrivilegeEscalation: false capabilities: drop: - ALL readOnlyRootFilesystem: false serviceAccount: annotations: eks.amazonaws.com/role-arn: arn:aws:iam::xxx:role/kubernetes/AirflowRole podTemplate: ~ dagProcessor: enabled: true replicas: 1 executor: "KubernetesExecutor" allowPodLaunching: true dags: persistence: enabled: false logs: persistence: enabled: false redis: enabled: false postgresql: enabled: false createUserJob: useHelmHooks: false applyCustomEnv: false migrateDatabaseJob: enabled: true useHelmHooks: false applyCustomEnv: false jobAnnotations: "argocd.argoproj.io/hook": Sync data: metadataSecretName: airflow-pgbouncer-secret enabled: true replicas: 1 revisionHistoryLimit: 1 sslmode: "disable" ciphers: "normal" auth_type: md5 command: ["pgbouncer", "-u", "nobody", "/etc/pgbouncer-config/pgbouncer.ini"] configSecretName: airflow-pgbouncer-ini-custom-secret extraVolumes: ... resources: limits: cpu: 1 memory: 1Gi requests: cpu: 1 memory: 1Gi ``` DAG that can reproduce the issue . After triggering, tasks will get scheduled and run as usual, but after a while failed worker pods will start showing. ``` from airflow.sdk import DAG from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup from datetime import datetime, timedelta import time # Task function def sleep_task(task_id): print(f"Starting {task_id} ...") time.sleep(30) print(f"Finished {task_id}.") # Default arguments default_args = { "owner": "airflow", "depends_on_past": False, "retries": 4, } with DAG( dag_id="spam_tasks_multiple_groups_dag", default_args=default_args, description="DAG with TaskGroups, each running multiple parallel tasks", schedule=None, start_date=datetime(2025, 1, 1), catchup=False, max_active_runs=1, tags=["test", "parallel", "taskgroup", "load"], ) as dag: start = PythonOperator( task_id="start", python_callable=lambda: print("Starting DAG run"), ) # Create task groups dynamically task_groups = [] for g in range(1, 10): with TaskGroup(group_id=f"spam_{g}") as tg: for i in range(1, 6): PythonOperator( task_id=f"task_{i}", python_callable=sleep_task, op_args=[f"grp{g}_task{i}"], ) task_groups.append(tg) end = PythonOperator( task_id="end", python_callable=lambda: print("All task groups completed."), ) # Define DAG structure: start → all groups (in parallel) → end start >> task_groups >> end ``` pip freeze | grep apache-airflow-providers ### Operating System Debian GNU/Linux 12 (bookworm) ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==9.15.0 apache-airflow-providers-celery==3.12.4 apache-airflow-providers-cncf-kubernetes==10.8.2 apache-airflow-providers-fab==3.0.0 apache-airflow-providers-postgres==6.3.0 apache-airflow-providers-standard==1.9.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
