JDarDagran opened a new issue, #39232:
URL: https://github.com/apache/airflow/issues/39232

   ### Apache Airflow Provider(s)
   
   celery, openlineage
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   2.9.0 but happens on 2.7+ too
   
   ### Operating System
   
   Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 
00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   The issue can be reproduced in all environments, both in local with breeze 
and cloud deployment, e.g. Astro Cloud.
   
   ### What happened
   
   OpenLineage listener hooks on DagRun state changes via 
`on_dag_run_running/failed/success`. When OL events are emitted via HTTP in 
large scale the scheduler hangs and needs restart. **The issue appears to be 
happening only with `CeleryExecutor`.**
   
   This couldn't be reproduced when disabling OpenLineage (with [openlineage] 
disabled = True) or with any other OpenLineage transport that doesn't use HTTP. 
I also experimented with using raw `urllib3` or `httpx` as alternative to 
`requests`. All of the experiments produced the same bug resulting in Scheduler 
hanging.
   
   ### What you think should happen instead
   
   When reproducing with local breeze setup with CeleryExecutor there’s this 
strange behaviour:
   
   ```htop```:
   
![image](https://github.com/apache/airflow/assets/3889552/9e006e71-cf50-42b9-bff5-0e2e72b080fa)
   
   ```lsof | grep CLOSE_WAIT```:
   
![image](https://github.com/apache/airflow/assets/3889552/03087fbf-9744-4301-905a-226b0f721a97)
   
   Stack from main loop of scheduler:
   
   ```bash
   Traceback for thread 152 (airflow) [] (most recent call last):
       (Python) File "/usr/local/bin/airflow", line 8, in <module>
           sys.exit(main())
       (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
           args.func(args)
       (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in 
command
           return func(*args, **kwargs)
       (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
           return f(*args, **kwargs)
       (Python) File 
"/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in 
wrapped_function
           return func(*args, **kwargs)
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 58, in scheduler
           run_command_with_daemon_option(
       (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 
85, in run_command_with_daemon_option
           callback()
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 61, in <lambda>
           callback=lambda: _run_scheduler_job(args),
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 49, in _run_scheduler_job
           run_job(job=job_runner.job, execute_callable=job_runner._execute)
       (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in 
wrapper
           return func(*args, session=session, **kwargs)
       (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
           return execute_job(job, execute_callable=execute_callable)
       (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in 
execute_job
           ret = execute_callable()
       (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
847, in _execute
           self._run_scheduler_loop()
       (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
982, in _run_scheduler_loop
           self.job.executor.heartbeat()
       (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
240, in heartbeat
           self.trigger_tasks(open_slots)
       (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
298, in trigger_tasks
           self._process_tasks(task_tuples)
       (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, 
in _process_tasks
           key_and_async_results = 
self._send_tasks_to_celery(task_tuples_to_send)
       (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, 
in _send_tasks_to_celery
           key_and_async_results = list(
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 484, in _chain_from_iterable_of_lists
           for element in iterable:
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 619, in result_iterator
           yield fs.pop().result()
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 439, in result
           self._condition.wait(timeout)
       (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait
           waiter.acquire()
   ```
   
   Stack from one of the child spawned scheduler processes
   
   ```bash
   Traceback for thread 6363 (airflow) [] (most recent call last):
       (Python) File "/usr/local/bin/airflow", line 8, in <module>
           sys.exit(main())
       (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main
           args.func(args)
       (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in 
command
           return func(*args, **kwargs)
       (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
           return f(*args, **kwargs)
       (Python) File 
"/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in 
wrapped_function
           return func(*args, **kwargs)
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 58, in scheduler
           run_command_with_daemon_option(
       (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 
85, in run_command_with_daemon_option
           callback()
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 61, in <lambda>
           callback=lambda: _run_scheduler_job(args),
       (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", 
line 49, in _run_scheduler_job
           run_job(job=job_runner.job, execute_callable=job_runner._execute)
       (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in 
wrapper
           return func(*args, session=session, **kwargs)
       (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job
           return execute_job(job, execute_callable=execute_callable)
       (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in 
execute_job
           ret = execute_callable()
       (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
847, in _execute
           self._run_scheduler_loop()
       (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 
982, in _run_scheduler_loop
           self.job.executor.heartbeat()
       (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
240, in heartbeat
           self.trigger_tasks(open_slots)
       (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 
298, in trigger_tasks
           self._process_tasks(task_tuples)
       (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, 
in _process_tasks
           key_and_async_results = 
self._send_tasks_to_celery(task_tuples_to_send)
       (Python) File 
"/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 343, 
in _send_tasks_to_celery
           send_pool.map(send_task_to_executor, task_tuples_to_send, 
chunksize=chunksize)
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 674, in map
           results = super().map(partial(_process_chunk, fn),
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 608, in map
           fs = [self.submit(fn, *args) for args in zip(*iterables)]
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", 
line 608, in <listcomp>
           fs = [self.submit(fn, *args) for args in zip(*iterables)]
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 645, in submit
           self._start_queue_management_thread()
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 584, in _start_queue_management_thread
           self._adjust_process_count()
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 608, in _adjust_process_count
           p.start()
       (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", 
line 121, in start
           self._popen = self._Popen(self)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/context.py", 
line 277, in _Popen
           return Popen(process_obj)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", 
line 19, in __init__
           self._launch(process_obj)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", 
line 75, in _launch
           code = process_obj._bootstrap(parent_sentinel=child_r)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", 
line 315, in _bootstrap
           self.run()
       (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", 
line 108, in run
           self._target(*self._args, **self._kwargs)
       (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", 
line 233, in _process_worker
           call_item = call_queue.get(block=True)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 
97, in get
           res = self._recv_bytes()
       (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", 
line 216, in recv_bytes
           buf = self._recv_bytes(maxlength)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", 
line 414, in _recv_bytes
           buf = self._recv(4)
       (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", 
line 379, in _recv
           chunk = read(handle, remaining)
   ```
   
   This points to fact that the bug has probably something to do with file 
descriptors not being closed properly. I did not follow the whole logic how 
scheduler spawns child processes but before scheduler getting stuck it spawns 
some child processes closing properly and not causing the issue.
   
   ### How to reproduce
   
   1. Setup breeze.
   2. Create `dynamic_dag.jinja2` template:
   
   ```python
   from __future__ import annotations
   
   from pendulum import datetime
   
   from airflow.decorators import dag, task
   
   @task
   def return_string(path):
       return path
   
   @dag(
       start_date=datetime(2023, 1, 1),
       max_active_runs={{ max_active_runs }},
       schedule=None,
       catchup=False,
   )
   def dynamic_dag_{{ number }}():
       return_string("whatever")
   
   dynamic_dag_{{ number }}()
   ```
   
   3. Create DAGs in `files/dags`  Airflow directory, e.g. with following 
script:
   
   ```python
   from __future__ import annotations
   
   import pathlib
   
   import jinja2
   
   DAGS_LOCATION = pathlib.Path.home() / "airflow" / "files" / "dags" # change 
if airflow is another location
   TEMPLATE_LOCATION = pathlib.Path(__file__).resolve().parent / 
"dynamic_dag.jinja2" # this should point to template file
   NO_OF_DAGS = 10
   MAX_ACTIVE_RUNS = 100
   
   if __name__ == '__main__':
       jinja_env = jinja2.Environment()
       template = jinja_env.from_string(TEMPLATE_LOCATION.read_text())
       for i in range(NO_OF_DAGS):
           with open(DAGS_LOCATION / f"dynamic_dag_{i}.py", 'w') as f:
               f.write(template.render(number=i, 
max_active_runs=MAX_ACTIVE_RUNS))
   ```
   
   4. Create `trigger_dag`  in `files/dags` :
   
   ```python
   from __future__ import annotations
   
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.api.common.trigger_dag import trigger_dag
   from airflow.models.dag import DagModel
   from airflow.models.dagrun import DagRun
   from airflow.operators.empty import EmptyOperator
   from airflow.operators.python import PythonOperator
   from airflow.utils import timezone
   from airflow.utils.types import DagRunType
   
   default_args = {
       "owner": "airflow",
       "start_date": datetime(2023, 1, 1),
       "depends_on_past": False,
   }
   
   NO_OF_DAGS = 10
   NO_OF_TRIGGERS_PER_DAG = 100
   
   def trigger(**kwargs):
       dag_runs = []
       def trigger_dags(x):
           for i in range(NO_OF_DAGS):
               trigger_dag_id = f"dynamic_dag_{i}"
               parsed_execution_date = timezone.utcnow()
               dag_model = DagModel.get_dagmodel(trigger_dag_id)
               if not dag_model:
                   continue
               dag_model.set_is_paused(False)
               run_id = DagRun.generate_run_id(DagRunType.MANUAL, 
parsed_execution_date)
               dag_run = trigger_dag(
                   dag_id=trigger_dag_id,
                   run_id=run_id + str(x),
                   execution_date=parsed_execution_date,
                   replace_microseconds=False,
               )
               dag_runs.append(dag_run)
       for i in range(NO_OF_TRIGGERS_PER_DAG):
           trigger_dags(i)
   
   # Main DAG that will trigger DAGs
   with DAG(
       "trigger_dag",
       default_args=default_args,
       schedule_interval=timedelta(days=1),  # Adjust as needed
       catchup=False,
   ) as dag:
       start = EmptyOperator(task_id="start")
   
       trigger_dags = PythonOperator(task_id="trigger_dags", 
python_callable=trigger)
   
       start >> trigger_dags
   ```
   
   5. Host local server (skil if you want to use some other, e.g. 
`httpbin.org`):
       1. Flask server in `app.py`
       
           
           ```
           from __future__ import annotations
           
           import sys
           import time
           
           from flask import Flask, Response, request
           
           app = Flask(__name__)
           
           @app.route('/', defaults={'path': ''})
           @app.route('/<path:path>', methods=['GET', 'HEAD', 'POST', 'PUT', 
'DELETE'])
           def catch_all(path):
               # time.sleep(4) # uncomment to delay response
               if request.is_json:
                   print(request.json, file=sys.stderr)
               
               return Response('You want path: %s' % path, status=502) # change 
the status if needed
           
           if __name__ == '__main__':
               app.run()
           ```
           
       2. requirements.txt
       
           
           ```
           flask==2.3.2
           python-dateutil==2.8.2
           gunicorn
           ```
           
       3. Dockerfile
       
           
           ```docker
           FROM python:3-alpine AS builder
            
           WORKDIR /app
            
           RUN python3 -m venv venv
           ENV VIRTUAL_ENV=/app/venv
           ENV PATH="$VIRTUAL_ENV/bin:$PATH"
            
           COPY requirements.txt .
           RUN pip install -r requirements.txt
            
           # Stage 2
           FROM python:3-alpine AS runner
            
           WORKDIR /app
            
           COPY --from=builder /app/venv venv
           COPY app.py app.py
            
           ENV VIRTUAL_ENV=/app/venv
           ENV PATH="$VIRTUAL_ENV/bin:$PATH"
           ENV FLASK_APP=app/app.py
            
           EXPOSE 8080
            
           CMD ["gunicorn", "--bind" , ":8080", "--workers", "2", "app:app"]
           ```
           
       4. Build image with: `docker build . -t test-server`
       5. Run with `docker run -eFLASK_RUN_PORT=5000 
-eFLASK_APP=[app.py](http://app.py/) --rm -p 5000:8080 test-server`
   6. In `files/airflow-breeze-config/variables.env` put following entries:
       
       ```docker
       AIRFLOW__CORE__LAZY_DISCOVER_PROVIDERS=False
       OPENLINEAGE_URL=http://host.docker.internal:5000
       AIRFLOW__CORE__LOGGING_LEVEL=DEBUG # if you want DEBUG logs
       ```
   example for httpbin.org would be to set below instead of `OPENLINEAGE_URL`:
   ```
   
AIRFLOW__OPENLINEAGE__TRANSPORT={"type":"http","url":"https://httpbin.org/status/401","endpoint":""}
   ```
       
   7. Run breeze with `breeze start-airflow --executor CeleryExecutor` and 
trigger `trigger_dag` . Wait until scheduler hangs and DAGs get stuck in 
queued/running state.
   
   ### Anything else
   
   The issue is reproducible with a considerable amount of concurrent DagRuns 
(100+ in my case, maybe it depends on the setup). I also did some tests with 
raw `threading` only and/or skipping OpenLineage dependencies at all (sending 
requests directly from `on_dag_run_...` hooks.
   
   I am willing to submit a PR that potentially fixes the bug - the change is 
to use `ProcessPoolExecutor` instead of `ThreadPoolExecutor`. This seems to fix 
it completely and not break anything else.
   
   I could not find the root cause for it eventually. My guts are saying it has 
something to do with threading in Python + some Celery dependency on file 
descriptors that produces the bug.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to