This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a3381572c92 fix flakiness (#52936)
a3381572c92 is described below

commit a3381572c9223250f2315c5db3429ed55d491da9
Author: Christos Bisias <[email protected]>
AuthorDate: Sun Jul 6 16:34:32 2025 +0300

    fix flakiness (#52936)
---
 airflow-core/tests/integration/otel/test_otel.py | 25 +++++++++++-------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index 41685f9b66e..99e99c39418 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -873,6 +873,11 @@ class TestOtelIntegration:
         will handle the rest of the dag processing. The paused thread will be 
resumed afterwards.
         """
 
+        # For this test, scheduler1 must be idle but still considered healthy 
by scheduler2.
+        # If scheduler2 marks the job as unhealthy, then it will recreate 
scheduler1's spans
+        # because it will consider them lost.
+        os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = 
"90"
+
         celery_worker_process = None
         scheduler_process_1 = None
         apiserver_process = None
@@ -937,13 +942,12 @@ class TestOtelIntegration:
             with open(self.control_file, "w") as file:
                 file.write("continue")
 
-            # Wait for scheduler2 to be up and running.
-            time.sleep(10)
-
             wait_for_dag_run_and_check_span_status(
                 dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.SHOULD_END
             )
 
+            # Stop scheduler2 in case it still has a db lock on the dag_run.
+            scheduler_process_2.terminate()
             scheduler_process_1.send_signal(signal.SIGCONT)
 
             # Wait for the scheduler to start again and continue running.
@@ -959,6 +963,9 @@ class TestOtelIntegration:
                 with create_session() as session:
                     dump_airflow_metadata_db(session)
 
+            # Reset for the rest of the tests.
+            os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] 
= "15"
+
             # Terminate the processes.
             celery_worker_process.terminate()
             celery_worker_process.wait()
@@ -969,7 +976,6 @@ class TestOtelIntegration:
             apiserver_process.terminate()
             apiserver_process.wait()
 
-            scheduler_process_2.terminate()
             scheduler_process_2.wait()
 
         out, err = capfd.readouterr()
@@ -991,7 +997,6 @@ class TestOtelIntegration:
         """
 
         celery_worker_process = None
-        scheduler_process_1 = None
         apiserver_process = None
         scheduler_process_2 = None
         try:
@@ -1032,7 +1037,7 @@ class TestOtelIntegration:
             with capfd.disabled():
                 scheduler_process_1.terminate()
 
-            assert scheduler_process_1.wait(timeout=30) == 0
+            assert scheduler_process_1.wait() == 0
 
             check_dag_run_state_and_span_status(
                 dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.NEEDS_CONTINUANCE
@@ -1049,9 +1054,6 @@ class TestOtelIntegration:
             with open(self.control_file, "w") as file:
                 file.write("continue")
 
-            # Wait for scheduler2 to be up and running.
-            time.sleep(10)
-
             wait_for_dag_run_and_check_span_status(
                 dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
             )
@@ -1066,8 +1068,6 @@ class TestOtelIntegration:
             celery_worker_process.terminate()
             celery_worker_process.wait()
 
-            scheduler_process_1.wait()
-
             apiserver_process.terminate()
             apiserver_process.wait()
 
@@ -1253,9 +1253,6 @@ class TestOtelIntegration:
                 stderr=None,
             )
 
-            # Wait for scheduler2 to be up and running.
-            time.sleep(10)
-
             wait_for_dag_run_and_check_span_status(
                 dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
             )

Reply via email to