JDarDagran opened a new issue, #39232: URL: https://github.com/apache/airflow/issues/39232
### Apache Airflow Provider(s) celery, openlineage ### Versions of Apache Airflow Providers _No response_ ### Apache Airflow version 2.9.0 but happens on 2.7+ too ### Operating System Darwin MacBook-Pro.local 23.4.0 Darwin Kernel Version 23.4.0: Fri Mar 15 00:10:42 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6000 arm64 ### Deployment Other ### Deployment details The issue can be reproduced in all environments, both in local with breeze and cloud deployment, e.g. Astro Cloud. ### What happened OpenLineage listener hooks on DagRun state changes via `on_dag_run_running/failed/success`. When OL events are emitted via HTTP in large scale the scheduler hangs and needs restart. **The issue appears to be happening only with `CeleryExecutor`.** This couldn't be reproduced when disabling OpenLineage (with [openlineage] disabled = True) or with any other OpenLineage transport that doesn't use HTTP. I also experimented with using raw `urllib3` or `httpx` as alternative to `requests`. All of the experiments produced the same bug resulting in Scheduler hanging. ### What you think should happen instead When reproducing with local breeze setup with CeleryExecutor there’s this strange behaviour: ```htop```:  ```lsof | grep CLOSE_WAIT```:  Stack from main loop of scheduler: ```bash Traceback for thread 152 (airflow) [] (most recent call last): (Python) File "/usr/local/bin/airflow", line 8, in <module> sys.exit(main()) (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main args.func(args) (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler run_command_with_daemon_option( (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option callback() (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda> callback=lambda: _run_scheduler_job(args), (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper return func(*args, session=session, **kwargs) (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job return execute_job(job, execute_callable=execute_callable) (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job ret = execute_callable() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute self._run_scheduler_loop() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop self.job.executor.heartbeat() (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat self.trigger_tasks(open_slots) (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks self._process_tasks(task_tuples) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 342, in _send_tasks_to_celery key_and_async_results = list( (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 484, in _chain_from_iterable_of_lists for element in iterable: (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 619, in result_iterator yield fs.pop().result() (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result self._condition.wait(timeout) (Python) File "/usr/local/lib/python3.8/threading.py", line 302, in wait waiter.acquire() ``` Stack from one of the child spawned scheduler processes ```bash Traceback for thread 6363 (airflow) [] (most recent call last): (Python) File "/usr/local/bin/airflow", line 8, in <module> sys.exit(main()) (Python) File "/opt/airflow/airflow/__main__.py", line 58, in main args.func(args) (Python) File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper return f(*args, **kwargs) (Python) File "/opt/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 58, in scheduler run_command_with_daemon_option( (Python) File "/opt/airflow/airflow/cli/commands/daemon_utils.py", line 85, in run_command_with_daemon_option callback() (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 61, in <lambda> callback=lambda: _run_scheduler_job(args), (Python) File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 49, in _run_scheduler_job run_job(job=job_runner.job, execute_callable=job_runner._execute) (Python) File "/opt/airflow/airflow/utils/session.py", line 84, in wrapper return func(*args, session=session, **kwargs) (Python) File "/opt/airflow/airflow/jobs/job.py", line 410, in run_job return execute_job(job, execute_callable=execute_callable) (Python) File "/opt/airflow/airflow/jobs/job.py", line 439, in execute_job ret = execute_callable() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 847, in _execute self._run_scheduler_loop() (Python) File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 982, in _run_scheduler_loop self.job.executor.heartbeat() (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 240, in heartbeat self.trigger_tasks(open_slots) (Python) File "/opt/airflow/airflow/executors/base_executor.py", line 298, in trigger_tasks self._process_tasks(task_tuples) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 292, in _process_tasks key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send) (Python) File "/opt/airflow/airflow/providers/celery/executors/celery_executor.py", line 343, in _send_tasks_to_celery send_pool.map(send_task_to_executor, task_tuples_to_send, chunksize=chunksize) (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 674, in map results = super().map(partial(_process_chunk, fn), (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in map fs = [self.submit(fn, *args) for args in zip(*iterables)] (Python) File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 608, in <listcomp> fs = [self.submit(fn, *args) for args in zip(*iterables)] (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 645, in submit self._start_queue_management_thread() (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 584, in _start_queue_management_thread self._adjust_process_count() (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 608, in _adjust_process_count p.start() (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 121, in start self._popen = self._Popen(self) (Python) File "/usr/local/lib/python3.8/multiprocessing/context.py", line 277, in _Popen return Popen(process_obj) (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) (Python) File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch code = process_obj._bootstrap(parent_sentinel=child_r) (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() (Python) File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) (Python) File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker call_item = call_queue.get(block=True) (Python) File "/usr/local/lib/python3.8/multiprocessing/queues.py", line 97, in get res = self._recv_bytes() (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) (Python) File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) ``` This points to fact that the bug has probably something to do with file descriptors not being closed properly. I did not follow the whole logic how scheduler spawns child processes but before scheduler getting stuck it spawns some child processes closing properly and not causing the issue. ### How to reproduce 1. Setup breeze. 2. Create `dynamic_dag.jinja2` template: ```python from __future__ import annotations from pendulum import datetime from airflow.decorators import dag, task @task def return_string(path): return path @dag( start_date=datetime(2023, 1, 1), max_active_runs={{ max_active_runs }}, schedule=None, catchup=False, ) def dynamic_dag_{{ number }}(): return_string("whatever") dynamic_dag_{{ number }}() ``` 3. Create DAGs in `files/dags` Airflow directory, e.g. with following script: ```python from __future__ import annotations import pathlib import jinja2 DAGS_LOCATION = pathlib.Path.home() / "airflow" / "files" / "dags" # change if airflow is another location TEMPLATE_LOCATION = pathlib.Path(__file__).resolve().parent / "dynamic_dag.jinja2" # this should point to template file NO_OF_DAGS = 10 MAX_ACTIVE_RUNS = 100 if __name__ == '__main__': jinja_env = jinja2.Environment() template = jinja_env.from_string(TEMPLATE_LOCATION.read_text()) for i in range(NO_OF_DAGS): with open(DAGS_LOCATION / f"dynamic_dag_{i}.py", 'w') as f: f.write(template.render(number=i, max_active_runs=MAX_ACTIVE_RUNS)) ``` 4. Create `trigger_dag` in `files/dags` : ```python from __future__ import annotations from datetime import datetime, timedelta from airflow import DAG from airflow.api.common.trigger_dag import trigger_dag from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator from airflow.utils import timezone from airflow.utils.types import DagRunType default_args = { "owner": "airflow", "start_date": datetime(2023, 1, 1), "depends_on_past": False, } NO_OF_DAGS = 10 NO_OF_TRIGGERS_PER_DAG = 100 def trigger(**kwargs): dag_runs = [] def trigger_dags(x): for i in range(NO_OF_DAGS): trigger_dag_id = f"dynamic_dag_{i}" parsed_execution_date = timezone.utcnow() dag_model = DagModel.get_dagmodel(trigger_dag_id) if not dag_model: continue dag_model.set_is_paused(False) run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date) dag_run = trigger_dag( dag_id=trigger_dag_id, run_id=run_id + str(x), execution_date=parsed_execution_date, replace_microseconds=False, ) dag_runs.append(dag_run) for i in range(NO_OF_TRIGGERS_PER_DAG): trigger_dags(i) # Main DAG that will trigger DAGs with DAG( "trigger_dag", default_args=default_args, schedule_interval=timedelta(days=1), # Adjust as needed catchup=False, ) as dag: start = EmptyOperator(task_id="start") trigger_dags = PythonOperator(task_id="trigger_dags", python_callable=trigger) start >> trigger_dags ``` 5. Host local server (skil if you want to use some other, e.g. `httpbin.org`): 1. Flask server in `app.py` ``` from __future__ import annotations import sys import time from flask import Flask, Response, request app = Flask(__name__) @app.route('/', defaults={'path': ''}) @app.route('/<path:path>', methods=['GET', 'HEAD', 'POST', 'PUT', 'DELETE']) def catch_all(path): # time.sleep(4) # uncomment to delay response if request.is_json: print(request.json, file=sys.stderr) return Response('You want path: %s' % path, status=502) # change the status if needed if __name__ == '__main__': app.run() ``` 2. requirements.txt ``` flask==2.3.2 python-dateutil==2.8.2 gunicorn ``` 3. Dockerfile ```docker FROM python:3-alpine AS builder WORKDIR /app RUN python3 -m venv venv ENV VIRTUAL_ENV=/app/venv ENV PATH="$VIRTUAL_ENV/bin:$PATH" COPY requirements.txt . RUN pip install -r requirements.txt # Stage 2 FROM python:3-alpine AS runner WORKDIR /app COPY --from=builder /app/venv venv COPY app.py app.py ENV VIRTUAL_ENV=/app/venv ENV PATH="$VIRTUAL_ENV/bin:$PATH" ENV FLASK_APP=app/app.py EXPOSE 8080 CMD ["gunicorn", "--bind" , ":8080", "--workers", "2", "app:app"] ``` 4. Build image with: `docker build . -t test-server` 5. Run with `docker run -eFLASK_RUN_PORT=5000 -eFLASK_APP=[app.py](http://app.py/) --rm -p 5000:8080 test-server` 6. In `files/airflow-breeze-config/variables.env` put following entries: ```docker AIRFLOW__CORE__LAZY_DISCOVER_PROVIDERS=False OPENLINEAGE_URL=http://host.docker.internal:5000 AIRFLOW__CORE__LOGGING_LEVEL=DEBUG # if you want DEBUG logs ``` example for httpbin.org would be to set below instead of `OPENLINEAGE_URL`: ``` AIRFLOW__OPENLINEAGE__TRANSPORT={"type":"http","url":"https://httpbin.org/status/401","endpoint":""} ``` 7. Run breeze with `breeze start-airflow --executor CeleryExecutor` and trigger `trigger_dag` . Wait until scheduler hangs and DAGs get stuck in queued/running state. ### Anything else The issue is reproducible with a considerable amount of concurrent DagRuns (100+ in my case, maybe it depends on the setup). I also did some tests with raw `threading` only and/or skipping OpenLineage dependencies at all (sending requests directly from `on_dag_run_...` hooks. I am willing to submit a PR that potentially fixes the bug - the change is to use `ProcessPoolExecutor` instead of `ThreadPoolExecutor`. This seems to fix it completely and not break anything else. I could not find the root cause for it eventually. My guts are saying it has something to do with threading in Python + some Celery dependency on file descriptors that produces the bug. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
