victorjourne opened a new issue, #28380: URL: https://github.com/apache/airflow/issues/28380
### Apache Airflow version 2.5.0 ### What happened Celery offers the capability to concurrently run *green threads* like `gevent` or `eventlet` for IO bound tasks. I didn't manage to setup a simple Airflow project which follow this basic idea. 1. Let us say _N_ concurrent tasks are scheduled with a _dynamic task mapping_ to a gevent celery pool of size _C_ 2. The first _C_ tasks are correctly executed and the metadata database is updated, but the their status in the result backend (here postgres) are not updated (**Why?**). In flower UI, tasks are still active. 3. After 10 minutes, an attempt is made to complete the tasks. As a result, (and despite of worrying logs), the task status turn into _success_ 4. Same scenario for the _N_ - _C_ left tasks. The log of a task : [dag_id=simple_mapping_run_id=scheduled__2022-12-14T12 10 17.314020+00 00_task_id=add_one_map_index=3_attempt=1.log](https://github.com/apache/airflow/files/10236974/dag_id.simple_mapping_run_id.scheduled__2022-12-14T12.10.17.314020%2B00.00_task_id.add_one_map_index.3_attempt.1.log) The log of the worker : [airflow-worker.log](https://github.com/apache/airflow/files/10236977/airflow-worker.log) ### What you think should happen instead Instead, the result backend database should be updated as soon as a task completes, in order to quickly let other tasks to run. First, I have suspected the `Postgres` result backend to be the culprit since it is not clear that Psychopg manage concurrent writings with `gevent`. But after seeing worker logs warning identical than #8164 about the _gevent monkey-patching_ , I have a doubt. ### How to reproduce - Strictly follow [docker compose airflow 2.5.0](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#fetching-docker-compose-yaml) instructions. - Add to `airflow-common-env` the env var `AIRFLOW__CELERY__POOL: 'gevent'`. - Launch from Airflow UI this simple DAG: ``` from datetime import datetime from airflow import DAG from airflow.decorators import task with DAG(dag_id="simple_mapping", catchup=False, start_date=datetime(2022, 3, 4), max_active_tasks=200) as dag: @task def add_one(x: int): return x + 1 @task def sum_it(values): total = sum(values) print(f"Total was {total}") xlist = list(range(25)) added_values = add_one.expand(x=xlist) sum_it(added_values) ``` - Observe the airflow-worker logs, the Airflow UI tasks flow and Flower active tasks. ### Operating System Ubuntu 22.04.1 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details Docker Engine - Community - Version: 20.10.18 Docker Compose version v2.10.2 ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
