This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fce4ca9b93 Attempt to battle flaky DAGFileProcessorAgent test (#38289)
fce4ca9b93 is described below

commit fce4ca9b93ca44e0d6955f4d6857d66e78818501
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Mar 19 21:25:12 2024 +0100

    Attempt to battle flaky DAGFileProcessorAgent test (#38289)
    
    Failed several times recently.
---
 tests/dag_processing/test_job_runner.py | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index d77f7692d3..683994f410 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -1370,6 +1370,17 @@ class TestDagProcessorJobRunner:
         ]
 
 
+def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent: 
DagFileProcessorAgent):
+    start_timer = time.monotonic()
+    while time.monotonic() - start_timer < 10:
+        if processor_agent.done and all(
+            [processor.done for processor in 
processor_agent._processors.values()]
+        ):
+            break
+        processor_agent.heartbeat()
+        time.sleep(0.1)
+
+
 class TestDagFileProcessorAgent:
     def setup_method(self):
         # Make sure that the configure_logging is not cached
@@ -1644,6 +1655,8 @@ class TestDagFileProcessorAgent:
             processor_agent.run_single_parsing_loop()
 
         processor_agent._process.join()
+        if async_mode:
+            
_wait_for_processor_agent_to_complete_in_async_mode(processor_agent)
 
         # Capture the stdout and stderr
         out, _ = capfd.readouterr()
@@ -1661,6 +1674,8 @@ class TestDagFileProcessorAgent:
             processor_agent.run_single_parsing_loop()
 
         processor_agent._process.join()
+        if async_mode:
+            
_wait_for_processor_agent_to_complete_in_async_mode(processor_agent)
 
         # Capture the stdout and stderr
         out, _ = capfd.readouterr()

Reply via email to