potiuk opened a new issue, #31575:
URL: https://github.com/apache/airflow/issues/31575

   ### Body
   
   When the tests are run on Public runners, they are quite often (especially 
with MSSQL) failing with Timeout when executing tests with 
`run_processor_manager_one_loop`.
   
   The problem is apparently a race condition in our 
`run_processor_manager_one_loop` test function:
   
   
https://github.com/apache/airflow/blob/main/tests/dag_processing/test_job_runner.py#L130
   
   This method is supposed to - apparenlty - perform a single processor manager 
loop and it should complete quickly, but apparently, processor manager runs in 
continuous loops when the race condition occurs.
   
   There is a bit of multiprocessing involved in that method but it looks like 
somewhere the message `DagParsingStat` that indicates that the parsing has 
completed is lost.
   
   This might also indicate a real race condition in the Scheduler / 
DagFileProcessor communication so might be worth investigating.
   
   Example failure:
   
   https://github.com/apache/airflow/actions/runs/5095671634/jobs/9161369711
   
   
   ```
   ______ TestDagProcessorJobRunner.test_send_file_processing_statsd_timing 
_______
     
     self = <tests.dag_processing.test_job_runner.TestDagProcessorJobRunner 
object at 0x7efde4a55750>
     statsd_timing_mock = <MagicMock name='timing' id='139628515583184'>
     tmpdir = 
local('/tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0')
     
         @conf_vars({("core", "load_examples"): "False"})
         @mock.patch("airflow.dag_processing.manager.Stats.timing")
         def test_send_file_processing_statsd_timing(self, statsd_timing_mock, 
tmpdir):
             filename_to_parse = tmpdir / "temp_dag.py"
             dag_code = dedent(
                 """
             from airflow import DAG
             dag = DAG(dag_id='temp_dag', schedule='0 0 * * *')
             """
             )
             with open(filename_to_parse, "w") as file_to_parse:
                 file_to_parse.writelines(dag_code)
         
             child_pipe, parent_pipe = multiprocessing.Pipe()
         
             async_mode = "sqlite" not in conf.get("database", 
"sql_alchemy_conn")
             manager = DagProcessorJobRunner(
                 job=Job(),
                 processor=DagFileProcessorManager(
                     dag_directory=tmpdir,
                     max_runs=1,
                     processor_timeout=timedelta(days=365),
                     signal_conn=child_pipe,
                     dag_ids=[],
                     pickle_dags=False,
                     async_mode=async_mode,
                 ),
             )
         
     >       self.run_processor_manager_one_loop(manager, parent_pipe)
     
     tests/dag_processing/test_job_runner.py:924: 
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     tests/dag_processing/test_job_runner.py:137: in 
run_processor_manager_one_loop
         manager.processor._run_parsing_loop()
     airflow/dag_processing/manager.py:575: in _run_parsing_loop
         ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
     /usr/local/lib/python3.7/multiprocessing/connection.py:921: in wait
         ready = selector.select(timeout)
     _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ _ 
     
     self = <selectors.PollSelector object at 0x7efdcc247950>, timeout = 991
     
         def select(self, timeout=None):
             # This is shared between poll() and epoll().
             # epoll() has a different signature and handling of timeout 
parameter.
             if timeout is None:
                 timeout = None
             elif timeout <= 0:
                 timeout = 0
             else:
                 # poll() has a resolution of 1 millisecond, round away from
                 # zero to wait *at least* timeout seconds.
                 timeout = math.ceil(timeout * 1e3)
             ready = []
             try:
     >           fd_event_list = self._selector.poll(timeout)
     E           Failed: Timeout >60.0s
     
     /usr/local/lib/python3.7/selectors.py:415: Failed
   ```
   
   STDOUT call:
   
   ```
     /usr/local/lib/python3.7/selectors.py:415: Failed
     ----------------------------- Captured stdout call 
-----------------------------
     [2023-05-27T00:42:17.791+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:17.792+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:17.855+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:17.857+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:17.896+0000] {manager.py:889} INFO - 
     
================================================================================
     DAG File Processing Stats
     
     File Path                                                                  
 PID  Runtime      # DAGs    # Errors  Last Runtime    Last Run
     ------------------------------------------------------------------------  
-----  ---------  --------  ----------  --------------  ----------
     /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0/temp_dag.py   
 168  0.07s             0           0
     
================================================================================
     [2023-05-27T00:42:18.860+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:18.861+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:18.902+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:18.902+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:19.901+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:19.902+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:19.923+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:19.924+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:20.938+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:20.938+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:20.969+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:20.969+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:22.019+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:22.020+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:22.057+0000] {manager.py:758} INFO - Searching for files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
     [2023-05-27T00:42:22.058+0000] {manager.py:761} INFO - There are 1 files 
in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0
   ```
   
   Resulting with the timeout.
   
   
   
   
   
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to