potiuk opened a new issue, #31575: URL: https://github.com/apache/airflow/issues/31575
### Body When the tests are run on Public runners, they are quite often (especially with MSSQL) failing with Timeout when executing tests with `run_processor_manager_one_loop`. The problem is apparently a race condition in our `run_processor_manager_one_loop` test function: https://github.com/apache/airflow/blob/main/tests/dag_processing/test_job_runner.py#L130 This method is supposed to - apparenlty - perform a single processor manager loop and it should complete quickly, but apparently, processor manager runs in continuous loops when the race condition occurs. There is a bit of multiprocessing involved in that method but it looks like somewhere the message `DagParsingStat` that indicates that the parsing has completed is lost. This might also indicate a real race condition in the Scheduler / DagFileProcessor communication so might be worth investigating. Example failure: https://github.com/apache/airflow/actions/runs/5095671634/jobs/9161369711 ``` ______ TestDagProcessorJobRunner.test_send_file_processing_statsd_timing _______ self = <tests.dag_processing.test_job_runner.TestDagProcessorJobRunner object at 0x7efde4a55750> statsd_timing_mock = <MagicMock name='timing' id='139628515583184'> tmpdir = local('/tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0') @conf_vars({("core", "load_examples"): "False"}) @mock.patch("airflow.dag_processing.manager.Stats.timing") def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir): filename_to_parse = tmpdir / "temp_dag.py" dag_code = dedent( """ from airflow import DAG dag = DAG(dag_id='temp_dag', schedule='0 0 * * *') """ ) with open(filename_to_parse, "w") as file_to_parse: file_to_parse.writelines(dag_code) child_pipe, parent_pipe = multiprocessing.Pipe() async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn") manager = DagProcessorJobRunner( job=Job(), processor=DagFileProcessorManager( dag_directory=tmpdir, max_runs=1, processor_timeout=timedelta(days=365), signal_conn=child_pipe, dag_ids=[], pickle_dags=False, async_mode=async_mode, ), ) > self.run_processor_manager_one_loop(manager, parent_pipe) tests/dag_processing/test_job_runner.py:924: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ tests/dag_processing/test_job_runner.py:137: in run_processor_manager_one_loop manager.processor._run_parsing_loop() airflow/dag_processing/manager.py:575: in _run_parsing_loop ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time) /usr/local/lib/python3.7/multiprocessing/connection.py:921: in wait ready = selector.select(timeout) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <selectors.PollSelector object at 0x7efdcc247950>, timeout = 991 def select(self, timeout=None): # This is shared between poll() and epoll(). # epoll() has a different signature and handling of timeout parameter. if timeout is None: timeout = None elif timeout <= 0: timeout = 0 else: # poll() has a resolution of 1 millisecond, round away from # zero to wait *at least* timeout seconds. timeout = math.ceil(timeout * 1e3) ready = [] try: > fd_event_list = self._selector.poll(timeout) E Failed: Timeout >60.0s /usr/local/lib/python3.7/selectors.py:415: Failed ``` STDOUT call: ``` /usr/local/lib/python3.7/selectors.py:415: Failed ----------------------------- Captured stdout call ----------------------------- [2023-05-27T00:42:17.791+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:17.792+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:17.855+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:17.857+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:17.896+0000] {manager.py:889} INFO - ================================================================================ DAG File Processing Stats File Path PID Runtime # DAGs # Errors Last Runtime Last Run ------------------------------------------------------------------------ ----- --------- -------- ---------- -------------- ---------- /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0/temp_dag.py 168 0.07s 0 0 ================================================================================ [2023-05-27T00:42:18.860+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:18.861+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:18.902+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:18.902+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:19.901+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:19.902+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:19.923+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:19.924+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:20.938+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:20.938+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:20.969+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:20.969+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:22.019+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:22.020+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:22.057+0000] {manager.py:758} INFO - Searching for files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 [2023-05-27T00:42:22.058+0000] {manager.py:761} INFO - There are 1 files in /tmp/pytest-of-root/pytest-0/test_send_file_processing_stat0 ``` Resulting with the timeout. ### Committer - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
