This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a3381572c92 fix flakiness (#52936)
a3381572c92 is described below
commit a3381572c9223250f2315c5db3429ed55d491da9
Author: Christos Bisias <[email protected]>
AuthorDate: Sun Jul 6 16:34:32 2025 +0300
fix flakiness (#52936)
---
airflow-core/tests/integration/otel/test_otel.py | 25 +++++++++++-------------
1 file changed, 11 insertions(+), 14 deletions(-)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 41685f9b66e..99e99c39418 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -873,6 +873,11 @@ class TestOtelIntegration:
will handle the rest of the dag processing. The paused thread will be
resumed afterwards.
"""
+ # For this test, scheduler1 must be idle but still considered healthy
by scheduler2.
+ # If scheduler2 marks the job as unhealthy, then it will recreate
scheduler1's spans
+ # because it will consider them lost.
+ os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] =
"90"
+
celery_worker_process = None
scheduler_process_1 = None
apiserver_process = None
@@ -937,13 +942,12 @@ class TestOtelIntegration:
with open(self.control_file, "w") as file:
file.write("continue")
- # Wait for scheduler2 to be up and running.
- time.sleep(10)
-
wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.SHOULD_END
)
+ # Stop scheduler2 in case it still has a db lock on the dag_run.
+ scheduler_process_2.terminate()
scheduler_process_1.send_signal(signal.SIGCONT)
# Wait for the scheduler to start again and continue running.
@@ -959,6 +963,9 @@ class TestOtelIntegration:
with create_session() as session:
dump_airflow_metadata_db(session)
+ # Reset for the rest of the tests.
+ os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"]
= "15"
+
# Terminate the processes.
celery_worker_process.terminate()
celery_worker_process.wait()
@@ -969,7 +976,6 @@ class TestOtelIntegration:
apiserver_process.terminate()
apiserver_process.wait()
- scheduler_process_2.terminate()
scheduler_process_2.wait()
out, err = capfd.readouterr()
@@ -991,7 +997,6 @@ class TestOtelIntegration:
"""
celery_worker_process = None
- scheduler_process_1 = None
apiserver_process = None
scheduler_process_2 = None
try:
@@ -1032,7 +1037,7 @@ class TestOtelIntegration:
with capfd.disabled():
scheduler_process_1.terminate()
- assert scheduler_process_1.wait(timeout=30) == 0
+ assert scheduler_process_1.wait() == 0
check_dag_run_state_and_span_status(
dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.NEEDS_CONTINUANCE
@@ -1049,9 +1054,6 @@ class TestOtelIntegration:
with open(self.control_file, "w") as file:
file.write("continue")
- # Wait for scheduler2 to be up and running.
- time.sleep(10)
-
wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.ENDED
)
@@ -1066,8 +1068,6 @@ class TestOtelIntegration:
celery_worker_process.terminate()
celery_worker_process.wait()
- scheduler_process_1.wait()
-
apiserver_process.terminate()
apiserver_process.wait()
@@ -1253,9 +1253,6 @@ class TestOtelIntegration:
stderr=None,
)
- # Wait for scheduler2 to be up and running.
- time.sleep(10)
-
wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.ENDED
)