This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fce4ca9b93 Attempt to battle flaky DAGFileProcessorAgent test (#38289)
fce4ca9b93 is described below
commit fce4ca9b93ca44e0d6955f4d6857d66e78818501
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Mar 19 21:25:12 2024 +0100
Attempt to battle flaky DAGFileProcessorAgent test (#38289)
Failed several times recently.
---
tests/dag_processing/test_job_runner.py | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index d77f7692d3..683994f410 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -1370,6 +1370,17 @@ class TestDagProcessorJobRunner:
]
+def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent:
DagFileProcessorAgent):
+ start_timer = time.monotonic()
+ while time.monotonic() - start_timer < 10:
+ if processor_agent.done and all(
+ [processor.done for processor in
processor_agent._processors.values()]
+ ):
+ break
+ processor_agent.heartbeat()
+ time.sleep(0.1)
+
+
class TestDagFileProcessorAgent:
def setup_method(self):
# Make sure that the configure_logging is not cached
@@ -1644,6 +1655,8 @@ class TestDagFileProcessorAgent:
processor_agent.run_single_parsing_loop()
processor_agent._process.join()
+ if async_mode:
+
_wait_for_processor_agent_to_complete_in_async_mode(processor_agent)
# Capture the stdout and stderr
out, _ = capfd.readouterr()
@@ -1661,6 +1674,8 @@ class TestDagFileProcessorAgent:
processor_agent.run_single_parsing_loop()
processor_agent._process.join()
+ if async_mode:
+
_wait_for_processor_agent_to_complete_in_async_mode(processor_agent)
# Capture the stdout and stderr
out, _ = capfd.readouterr()