CreeperBeatz opened a new issue, #59553: URL: https://github.com/apache/airflow/issues/59553
### Apache Airflow version 3.1.5 ### If "Other Airflow 3 version" selected, which one? _No response_ ### What happened? For my workload, I ingest video files and then create tasks to process each one. It is normal to create 30+ tasks that will get processed in the span of 40+min. On backfill, it's absolutely normal for my GPU workers to not be available and for a task to wait 1h+ to be executed. This is why I changed `AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT` to 10 000 (see image below). <img width="853" height="313" alt="Image" src="https://github.com/user-attachments/assets/7694da8f-5ef9-498a-bd5f-f82bb8f09c07" /> I also changed my task definition to allow a task to be in processing for 24h, so this is not the issue either. <img width="505" height="206" alt="Image" src="https://github.com/user-attachments/assets/c533b354-cecc-4507-b79f-9dc46842ffbc" /> However, even when setting that, my tasks fail and the retry mechanism kicks in. I've bandaged fixed it by increasing the number of retries, but it's not optimal. <img width="592" height="207" alt="Image" src="https://github.com/user-attachments/assets/de49a376-7e57-4af4-b2b8-ea1d4dba762d" /> ### What you think should happen instead? AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT should be taken into account, and tasks should not timeout on around 15min. All tasks eventually complete. With 20 runs × 2.5 min each and 1 worker, the last task should wait ~50 min in the queue - well within the 100,000s timeout. ### How to reproduce ### 1. Install Airflow ```bash uv pip install "apache-airflow[celery]==3.1.5" \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-3.12.txt" ``` ### 2. Initialize and configure ```bash airflow db migrate sed -i 's/^executor = .*/executor = CeleryExecutor/' airflow.cfg sed -i 's/^task_queued_timeout = .*/task_queued_timeout = 100000.0/' airflow.cfg ``` ### 3. Create the DAG Save as `dags/long_running_task_dag.py`: ```python from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator def long_running_task(): """Simulate GPU processing - sleeps for 2.5 minutes.""" sleep(150) default_args = { "owner": "airflow", "retries": 0, "execution_timeout": timedelta(hours=24), } with DAG( dag_id="long_running_task_bug_repro", default_args=default_args, start_date=datetime(2025, 1, 1), schedule=None, catchup=False, ) as dag: process_task = PythonOperator( task_id="gpu_simulation", python_callable=long_running_task, queue="gpu_queue", ) ``` ### 4. Start services Terminal 1 - Scheduler: ```bash airflow api-server --port 8080 & airflow scheduler & airflow dag-processor & airflow triggerer & airflow celery worker --queues default ``` Terminal 2 - Single worker (simulates limited GPU resources): ```bash airflow celery worker --queues gpu_queue --concurrency 1 ``` ### 5. Trigger multiple DAG runs ```bash for i in $(seq 1 20); do airflow dags trigger long_running_task_bug_repro done ``` ### 6. Wait ~15 minutes ### Operating System Ubuntu LTS 22.04 ### Versions of Apache Airflow Providers apache-airflow-providers-celery==3.14.0 apache-airflow-providers-common-compat==1.10.0 apache-airflow-providers-common-io==1.7.0 apache-airflow-providers-common-sql==1.30.0 apache-airflow-providers-smtp==2.4.0 apache-airflow-providers-standard==1.10.0 ### Deployment Virtualenv installation ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
