dstandish commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2892566874


##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1077,435 +488,36 @@ def test_same_scheduler_processing_the_entire_dag(
         log.info("out-start --\n%s\n-- out-end", out)
         log.info("err-start --\n%s\n-- err-end", err)
 
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans from the output.
-            check_spans_without_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_change_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler thread will be paused after the first task ends and a 
new scheduler process
-        will handle the rest of the dag processing. The paused thread will be 
resumed afterwards.
-        """
-
-        # For this test, scheduler1 must be idle but still considered healthy 
by scheduler2.
-        # If scheduler2 marks the job as unhealthy, then it will recreate 
scheduler1's spans
-        # because it will consider them lost.
-        os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = 
"90"
-
-        celery_worker_process = None
-        scheduler_process_1 = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            with capfd.disabled():
-                # When the scheduler1 thread is paused, capfd keeps trying to 
read the
-                # file descriptors for the process and ends up freezing the 
test.
-                # Temporarily disable capfd to avoid that.
-                scheduler_process_1.send_signal(signal.SIGSTOP)
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Start the 2nd scheduler immediately without any delay to avoid 
having the 1st scheduler
-            # marked as unhealthy. If that happens, then the 2nd will recreate 
the spans that the
-            # 1st scheduler started.
-            # The scheduler would also be considered unhealthy in case it was 
paused
-            # and the dag run continued running.
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.SHOULD_END
-            )
-
-            # Stop scheduler2 in case it still has a db lock on the dag_run.
-            scheduler_process_2.terminate()
-            scheduler_process_1.send_signal(signal.SIGCONT)
-
-            # Wait for the scheduler to start again and continue running.
-            time.sleep(10)
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=30, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Reset for the rest of the tests.
-            os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] 
= "15"
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            scheduler_process_1.terminate()
-            scheduler_process_1.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, 
is_recreated=False, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(

Review Comment:
   the span only exists while the task process is running. whenever the task 
process exits, the span will be emitted.
   
   if there's an OOMKILL there's nothing to be done anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to