tirkarthi opened a new issue, #46048: URL: https://github.com/apache/airflow/issues/46048
### Apache Airflow version main (development) ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? It seems airflow dag processor opens sockets under the hood that are not closed properly leading too many open files after it runs for sometime. To reproduce this please follow below steps since it takes sometime for the ulimit to be reached in case it's unlimited or very high. There is comment on reading the source code https://github.com/apache/airflow/blob/29b9e8ea10de7a82ad40a7a2160c64a84004a45e/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L345-L361 ### What you think should happen instead? The open files should not increase and the files opened should be closed. The socket warnings should be fixed which could indicate the problem. ``` 2025-01-26 09:20:05 [debug ] Workload process exited 1 [supervisor] exit_code=0 [2025-01-26T09:20:05.502+0530] {dag.py:1841} INFO - Sync 1 DAGs [2025-01-26T09:20:05.510+0530] {dag.py:2398} INFO - Setting next_dagrun for example_setup_teardown_taskflow to None, run_after=None /usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=320, family=1, type=1, proto=0> ``` ### How to reproduce 1. set `min_file_process_interval = 10` in airflow.cfg to trigger frequent reparsing. 2. Run `PYTHONWARNINGS=always python -X dev -m airflow dag-processor` 3. Use `ps aux | grep -i dag-processor` to get the pid 4. Run `watch "ls -1 /proc/<pid>/fd | wc"` which keeps increasing 5. In another tab set open files limit for the process using `prlimit --pid <pid> --nofile=1024:1024` Once the limit is reached dag processor exits with following stack trace ``` /usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket fd=1010, family=1, type=1, proto=0> [2025-01-26T09:12:56.322+0530] {dag_processor_job_runner.py:63} ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run return self._run_parsing_loop() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop self._start_new_processes() File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes processor = self._create_process(file_path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process return DagFileProcessorProcess.start( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start proc: Self = super().start(target=target, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start child_logs, read_logs = mkpipe() ^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe rsock, wsock = socketpair() ^^^^^^^^^^^^ File "/usr/lib/python3.11/socket.py", line 657, in socketpair a, b = _socket.socketpair(family, type, proto) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ OSError: [Errno 24] Too many open files [2025-01-26T09:12:56.332+0530] {process_utils.py:266} INFO - Waiting up to 5 seconds for processes to exit... Traceback (most recent call last): File "<frozen runpy>", line 198, in _run_module_as_main File "<frozen runpy>", line 88, in _run_code File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 62, in <module> main() File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 58, in main args.func(args) File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", line 49, in command return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 111, in wrapper return f(*args, **kwargs) ^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 54, in dag_processor run_command_with_daemon_option( File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option callback() File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py", line 57, in <lambda> callback=lambda: run_job(job=job_runner.job, execute_callable=job_runner._execute), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 101, in wrapper return func(*args, session=session, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 342, in run_job return execute_job(job, execute_callable=execute_callable) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 371, in execute_job ret = execute_callable() ^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 231, in run return self._run_parsing_loop() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 315, in _run_parsing_loop self._start_new_processes() File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 779, in _start_new_processes processor = self._create_process(file_path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", line 763, in _create_process return DagFileProcessorProcess.start( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", line 212, in start proc: Self = super().start(target=target, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 343, in start child_logs, read_logs = mkpipe() ^^^^^^^^ File "/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 132, in mkpipe rsock, wsock = socketpair() ^^^^^^^^^^^^ File "/usr/lib/python3.11/socket.py", line 657, in socketpair a, b = _socket.socketpair(family, type, proto) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ OSError: [Errno 24] Too many open files ``` ### Operating System Ubuntu 20.04.3 LTS ### Versions of Apache Airflow Providers _No response_ ### Deployment Other ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org