howard-oc opened a new issue, #55766: URL: https://github.com/apache/airflow/issues/55766
### Apache Airflow version 3.0.6 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? When configuring Airflow 3.0.6 with multiple executors concurrently using `executor: "CeleryExecutor,KubernetesExecutor"`, tasks with `queue='kubernetes'` are not being routed to the KubernetesExecutor. Instead, all tasks are being executed by the CeleryExecutor regardless of their queue assignment. ## Expected Behavior According to the [Airflow documentation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently): - Tasks without `queue` parameter → CeleryExecutor (default) - Tasks with `queue='kubernetes'` → KubernetesExecutor ## Actual Behavior - **All tasks are executed by CeleryExecutor** regardless of queue assignment - Tasks with `queue='kubernetes'` are being executed by `CeleryExecutor(parallelism=32)` instead of KubernetesExecutor - No Kubernetes pods are created for tasks with `queue='kubernetes'` ## Configuration ### Helm Values (airflow.cfg) ```yaml # Multiple Executors (Airflow 3.x+) executor: "CeleryExecutor,KubernetesExecutor" airflow: config: # Multiple Executors Concurrently (Airflow 3.0+) AIRFLOW__CORE__EXECUTOR: "CeleryExecutor,KubernetesExecutor" # Kubernetes queue configuration AIRFLOW__KUBERNETES__KUBERNETES_QUEUE: "kubernetes" ``` ### Generated airflow.cfg ```ini [core] executor = CeleryExecutor,KubernetesExecutor [celery_kubernetes_executor] kubernetes_queue = kubernetes [kubernetes_executor] multi_namespace_mode = False namespace = airflow pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml worker_container_repository = apache/airflow worker_container_tag = 3.0.6 ``` ## Environment Details - **Airflow Version**: 3.0.6 - **Python Version**: 3.12 - **Kubernetes Version**: 1.28+ (Kind cluster) - **Helm Chart**: apache-airflow/airflow 1.18.0 - **Deployment Method**: Helm + ArgoCD ## Steps to Reproduce 1. Deploy Airflow 3.0.6 with the configuration above 2. Set `executor: "CeleryExecutor,KubernetesExecutor"` in Helm values 3. Set `AIRFLOW__CORE__EXECUTOR: "CeleryExecutor,KubernetesExecutor"` in environment variables 4. Set `AIRFLOW__KUBERNETES__KUBERNETES_QUEUE: "kubernetes"` in environment variables 5. Create a DAG with tasks using `queue='kubernetes'` 6. Trigger the DAG 7. Observe that all tasks are executed by CeleryExecutor ## Expected vs Actual Results | Task | Queue | Expected Executor | Actual Executor | Status | |------|-------|-------------------|-----------------|---------| | celery_python_task | default | CeleryExecutor | CeleryExecutor | ✅ Working | | kubernetes_python_task | kubernetes | KubernetesExecutor | CeleryExecutor | ❌ **Bug** | | celery_bash_task | default | CeleryExecutor | CeleryExecutor | ✅ Working | ## Impact - **High Impact**: Core functionality documented in official Airflow documentation is not working - **Workaround**: None available - tasks cannot be routed to KubernetesExecutor - **Affected Users**: All users trying to use multiple executors concurrently in Airflow 3.x ## Additional Information ### Configuration Verification ```bash # Executor configuration $ airflow config get-value core executor CeleryExecutor,KubernetesExecutor # Kubernetes queue configuration $ airflow config get-value celery_kubernetes_executor kubernetes_queue kubernetes # Environment variable $ env | grep AIRFLOW__CORE__EXECUTOR AIRFLOW__CORE__EXECUTOR=CeleryExecutor,KubernetesExecutor ``` ### Related Documentation - [Using Multiple Executors Concurrently](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently) - [CeleryKubernetesExecutor Deprecation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#celerykubernetesexecutor) ## Request Please investigate why the multiple executors concurrently feature is not routing tasks with `queue='kubernetes'` to the KubernetesExecutor in Airflow 3.0.6. This appears to be a regression or configuration issue that prevents the documented functionality from working as expected. --- **Issue Log Created**: 2025-09-17 **Reporter**: Data Platform Team **Priority**: High **Status**: Open ### What you think should happen instead? _No response_ ### How to reproduce ## Test Case ### DAG Configuration ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2025, 9, 17), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'hybrid_executor_test', default_args=default_args, description='Test DAG for Multiple Executors Concurrently', schedule=timedelta(hours=1), catchup=False, tags=['hybrid', 'test', 'executor'], ) def celery_python_task(): print("Hello from Celery Python Task!") import os print(f"Running in pod: {os.getenv('HOSTNAME')}") print(f"Executor: {os.getenv('AIRFLOW__CORE__EXECUTOR')}") def kubernetes_python_task(): print("Hello from Kubernetes Python Task!") import os print(f"Running in pod: {os.getenv('HOSTNAME')}") print(f"Executor: {os.getenv('AIRFLOW__CORE__EXECUTOR')}") # Task 1: Should use CeleryExecutor (no queue specified) celery_task = PythonOperator( task_id='celery_python_task', python_callable=celery_python_task, dag=dag, ) # Task 2: Should use KubernetesExecutor (queue='kubernetes') kubernetes_task = PythonOperator( task_id='kubernetes_python_task', python_callable=kubernetes_python_task, queue='kubernetes', # This should route to KubernetesExecutor dag=dag, ) # Task 3: Should use CeleryExecutor (no queue specified) celery_bash_task = BashOperator( task_id='celery_bash_task', bash_command='echo "Hello from Celery Bash Task! Running on $(hostname)" && sleep 5', dag=dag, ) # Set task dependencies celery_task >> kubernetes_task >> celery_bash_task ``` ## Observed Behavior ### Task Execution Logs ``` [2025-09-17T11:04:30.751+0000] {scheduler_job_runner.py:879} INFO - TaskInstance Finished: dag_id=hybrid_executor_test, task_id=celery_python_task, run_id=manual__2025-09-17T11:04:23.446622+00:00_ssPeqR6R, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=1, max_tries=1, pool=default_pool, queue=default, priority_weigh=3, operator=PythonOperator, queued_dttm=2025-09-17 11:04:24.733620+00:00, scheduled_dttm=2025-09-17 11:04:24.710596+00:00,queued_by_job_id=47, pid=None [2025-09-17T11:04:30.988+0000] {scheduler_job_runner.py:879} INFO - TaskInstance Finished: dag_id=hybrid_executor_test, task_id=kubernetes_python_task, run_id=manual__2025-09-17T10:29:05.176689+00:00_eqFN1IEs, map_index=-1, run_start_date=2025-09-17 11:04:30.785718+00:00, run_end_date=2025-09-17 11:04:30.785718+00:00, run_duration=0.0, state=failed, executor=CeleryExecutor(parallelism=32), executor_state=failed, try_number=1, max_tries=1, pool=default_pool, queue=kubernetes, priority_weight=2, operator=PythonOperator, queued_dttm=2025-09-17 10:53:35.344116+00:00, scheduled_dttm=2025-09-17 10:53:31.646261+00:00,queued_by_job_id=47, pid=None ``` **Key Observations:** 1. `celery_python_task` (no queue) → `executor=CeleryExecutor(parallelism=32), queue=default` ✅ **Expected** 2. `kubernetes_python_task` (queue='kubernetes') → `executor=CeleryExecutor(parallelism=32), queue=kubernetes` ❌ **Unexpected** ### Kubernetes Executor Status ``` [2025-09-17T11:07:14.896+0000] {kubernetes_executor_utils.py:95} INFO - Kubernetes watch timed out waiting for events. Restarting watch. [2025-09-17T11:07:15.944+0000] {kubernetes_executor_utils.py:132} INFO - Event: and now my watch begins starting at resource_version: 0 ``` The Kubernetes executor is running and watching for events, but no pods are created. ### Operating System Ubuntu ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
