victorjourne opened a new issue, #28380:
URL: https://github.com/apache/airflow/issues/28380

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   Celery offers the capability to concurrently run *green threads* like 
`gevent` or `eventlet` for IO bound tasks.
   
   I didn't manage to setup a simple Airflow project which follow this basic 
idea.
   
   1. Let us say  _N_ concurrent tasks are scheduled with a _dynamic task 
mapping_ to a gevent celery pool of size  _C_
   2. The first _C_ tasks  are correctly executed and the metadata database is 
updated,  but the their status in the result backend (here postgres) are not 
updated (**Why?**). In flower UI, tasks are still active.
   3. After 10 minutes, an attempt is made to complete the tasks. As a result, 
(and despite of worrying logs), the task status turn into _success_ 
   4. Same scenario for the _N_ - _C_ left tasks.
   
   The log of a task : [dag_id=simple_mapping_run_id=scheduled__2022-12-14T12 
10 17.314020+00 
00_task_id=add_one_map_index=3_attempt=1.log](https://github.com/apache/airflow/files/10236974/dag_id.simple_mapping_run_id.scheduled__2022-12-14T12.10.17.314020%2B00.00_task_id.add_one_map_index.3_attempt.1.log)
   
   The log of the worker  : 
[airflow-worker.log](https://github.com/apache/airflow/files/10236977/airflow-worker.log)
   
   
   ### What you think should happen instead
   
   Instead, the result backend database should be updated as soon as a task 
completes, in order to quickly let other tasks to run.
   
   First, I have suspected the `Postgres` result backend to be the culprit 
since it is not clear that Psychopg manage concurrent writings with `gevent`.
   
   But after seeing worker logs warning identical than #8164 about the _gevent 
monkey-patching_ , I have a doubt.
   
   ### How to reproduce
   
   - Strictly follow [docker compose airflow 
2.5.0](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#fetching-docker-compose-yaml)
 instructions.
   
   - Add to  `airflow-common-env` the env var `AIRFLOW__CELERY__POOL: 'gevent'`.
   
   - Launch from Airflow UI this simple DAG: 
   
   ```
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   
   with DAG(dag_id="simple_mapping",
           catchup=False,
           start_date=datetime(2022, 3, 4),
           max_active_tasks=200) as dag:
   
       @task
       def add_one(x: int):
           return x + 1
   
       @task
       def sum_it(values):
           total = sum(values)
           print(f"Total was {total}")
   
       xlist = list(range(25))
       added_values = add_one.expand(x=xlist)
   
       sum_it(added_values)
   ```
   
   -  Observe the airflow-worker logs, the Airflow UI tasks flow and Flower 
active tasks.
   
   
   ### Operating System
   
   Ubuntu 22.04.1 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Docker Engine - Community - Version: 20.10.18
   Docker Compose version v2.10.2
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to