dstandish commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2892566874
##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1077,435 +488,36 @@ def test_same_scheduler_processing_the_entire_dag(
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans from the output.
- check_spans_without_continuance(output=out, dag=dag)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_change_after_the_first_task_finishes(
- self, monkeypatch, celery_worker_env_vars, capfd, session
- ):
- """
- The scheduler thread will be paused after the first task ends and a
new scheduler process
- will handle the rest of the dag processing. The paused thread will be
resumed afterwards.
- """
-
- # For this test, scheduler1 must be idle but still considered healthy
by scheduler2.
- # If scheduler2 marks the job as unhealthy, then it will recreate
scheduler1's spans
- # because it will consider them lost.
- os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] =
"90"
-
- celery_worker_process = None
- scheduler_process_1 = None
- apiserver_process = None
- scheduler_process_2 = None
- try:
- # Start the processes here and not as fixtures or in a common
setup,
- # so that the test can capture their output.
- celery_worker_process, scheduler_process_1, apiserver_process =
self.start_worker_and_scheduler1()
-
- dag_id = "otel_test_dag_with_pause_between_tasks"
- dag = self.dags[dag_id]
-
- run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
- deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
- while True:
- # To avoid get stuck waiting.
- if time.monotonic() > deadline:
- raise TimeoutError(
- f"Timed out waiting for 'pause' to appear in
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
- )
-
- try:
- with open(self.control_file) as file:
- file_contents = file.read()
-
- if "pause" in file_contents:
- log.info("Control file exists and the task has
been paused.")
- break
- time.sleep(1)
- continue
- except FileNotFoundError:
- print("Control file not found. Waiting...")
- time.sleep(3)
- continue
-
- with capfd.disabled():
- # When the scheduler1 thread is paused, capfd keeps trying to
read the
- # file descriptors for the process and ends up freezing the
test.
- # Temporarily disable capfd to avoid that.
- scheduler_process_1.send_signal(signal.SIGSTOP)
-
- check_dag_run_state_and_span_status(
- dag_id=dag_id, run_id=run_id, state=State.RUNNING,
span_status=SpanStatus.ACTIVE
- )
-
- # Start the 2nd scheduler immediately without any delay to avoid
having the 1st scheduler
- # marked as unhealthy. If that happens, then the 2nd will recreate
the spans that the
- # 1st scheduler started.
- # The scheduler would also be considered unhealthy in case it was
paused
- # and the dag run continued running.
-
- scheduler_process_2 = subprocess.Popen(
- self.scheduler_command_args,
- env=os.environ.copy(),
- stdout=None,
- stderr=None,
- )
-
- # Rewrite the file to unpause the dag.
- with open(self.control_file, "w") as file:
- file.write("continue")
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=120,
span_status=SpanStatus.SHOULD_END
- )
-
- # Stop scheduler2 in case it still has a db lock on the dag_run.
- scheduler_process_2.terminate()
- scheduler_process_1.send_signal(signal.SIGCONT)
-
- # Wait for the scheduler to start again and continue running.
- time.sleep(10)
-
- wait_for_dag_run_and_check_span_status(
- dag_id=dag_id, run_id=run_id, max_wait_time=30,
span_status=SpanStatus.ENDED
- )
-
- print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
- finally:
- if self.log_level == "debug":
- with create_session() as session:
- dump_airflow_metadata_db(session)
-
- # Reset for the rest of the tests.
- os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"]
= "15"
-
- # Terminate the processes.
- celery_worker_process.terminate()
- celery_worker_process.wait()
-
- scheduler_process_1.terminate()
- scheduler_process_1.wait()
-
- apiserver_process.terminate()
- apiserver_process.wait()
-
- scheduler_process_2.wait()
-
- out, err = capfd.readouterr()
- log.info("out-start --\n%s\n-- out-end", out)
- log.info("err-start --\n%s\n-- err-end", err)
-
- if self.use_otel != "true":
- # Dag run should have succeeded. Test the spans in the output.
- check_spans_for_paused_dag(output=out, dag=dag,
is_recreated=False, check_t1_sub_spans=False)
-
- @pytest.mark.execution_timeout(90)
- def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
Review Comment:
the span only exists while the task process is running. whenever the task
process exits, the span will be emitted.
if there's an OOMKILL there's nothing to be done anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]