tirkarthi opened a new issue, #46048:
URL: https://github.com/apache/airflow/issues/46048

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   It seems airflow dag processor opens sockets under the hood that are not 
closed properly leading too many open files after it runs for sometime. To 
reproduce this please follow below steps since it takes sometime for the ulimit 
to be reached in case it's unlimited or very high.
   
   There is comment on reading the source code
   
   
https://github.com/apache/airflow/blob/29b9e8ea10de7a82ad40a7a2160c64a84004a45e/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L345-L361
   
   
   ### What you think should happen instead?
   
   The open files should not increase and the files opened should be closed. 
The socket warnings should be fixed which could indicate the problem.
   
   ```
   2025-01-26 09:20:05 [debug    ] Workload process exited 1      [supervisor] 
exit_code=0
   [2025-01-26T09:20:05.502+0530] {dag.py:1841} INFO - Sync 1 DAGs
   [2025-01-26T09:20:05.510+0530] {dag.py:2398} INFO - Setting next_dagrun for 
example_setup_teardown_taskflow to None, run_after=None
   /usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket 
fd=320, family=1, type=1, proto=0>
   ```
   
   ### How to reproduce
   
   1. set `min_file_process_interval = 10` in airflow.cfg to trigger frequent 
reparsing.
   2. Run `PYTHONWARNINGS=always python -X dev -m airflow dag-processor`
   3. Use `ps aux | grep -i dag-processor` to get the pid
   4. Run `watch "ls -1 /proc/<pid>/fd | wc"` which keeps increasing
   5. In another tab set open files limit for the process using `prlimit --pid 
<pid> --nofile=1024:1024`
   Once the limit is reached dag processor exits with following stack trace
   
   ```
   /usr/lib/python3.11/socket.py:789 ResourceWarning: unclosed <socket.socket 
fd=1010, family=1, type=1, proto=0>
   [2025-01-26T09:12:56.322+0530] {dag_processor_job_runner.py:63} ERROR - 
Exception when executing DagProcessorJob
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 231, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 315, in _run_parsing_loop
       self._start_new_processes()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 779, in _start_new_processes
       processor = self._create_process(file_path)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 763, in _create_process
       return DagFileProcessorProcess.start(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", 
line 212, in start
       proc: Self = super().start(target=target, **kwargs)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 343, in start
       child_logs, read_logs = mkpipe()
                               ^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 132, in mkpipe
       rsock, wsock = socketpair()
                      ^^^^^^^^^^^^
     File "/usr/lib/python3.11/socket.py", line 657, in socketpair
       a, b = _socket.socketpair(family, type, proto)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   OSError: [Errno 24] Too many open files
   [2025-01-26T09:12:56.332+0530] {process_utils.py:266} INFO - Waiting up to 5 
seconds for processes to exit...
   Traceback (most recent call last):
     File "<frozen runpy>", line 198, in _run_module_as_main
     File "<frozen runpy>", line 88, in _run_code
     File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 
62, in <module>
       main()
     File "/home/karthikeyan/stuff/python/airflow/airflow/__main__.py", line 
58, in main
       args.func(args)
     File "/home/karthikeyan/stuff/python/airflow/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/cli.py", line 
111, in wrapper
       return f(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py",
 line 54, in dag_processor
       run_command_with_daemon_option(
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/dag_processor_command.py",
 line 57, in <lambda>
       callback=lambda: run_job(job=job_runner.job, 
execute_callable=job_runner._execute),
                        
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", 
line 101, in wrapper
       return func(*args, session=session, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
342, in run_job
       return execute_job(job, execute_callable=execute_callable)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 
371, in execute_job
       ret = execute_callable()
             ^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/jobs/dag_processor_job_runner.py",
 line 61, in _execute
       self.processor.run()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 231, in run
       return self._run_parsing_loop()
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 315, in _run_parsing_loop
       self._start_new_processes()
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 779, in _start_new_processes
       processor = self._create_process(file_path)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/manager.py", 
line 763, in _create_process
       return DagFileProcessorProcess.start(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/airflow/dag_processing/processor.py", 
line 212, in start
       proc: Self = super().start(target=target, **kwargs)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 343, in start
       child_logs, read_logs = mkpipe()
                               ^^^^^^^^
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py",
 line 132, in mkpipe
       rsock, wsock = socketpair()
                      ^^^^^^^^^^^^
     File "/usr/lib/python3.11/socket.py", line 657, in socketpair
       a, b = _socket.socketpair(family, type, proto)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   OSError: [Errno 24] Too many open files
   
   ```
   
   ### Operating System
   
   Ubuntu 20.04.3 LTS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to