xBis7 commented on code in PR #62554:
URL: https://github.com/apache/airflow/pull/62554#discussion_r2891628134


##########
airflow-core/src/airflow/observability/traces/__init__.py:
##########
@@ -15,3 +15,115 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from __future__ import annotations
+
+import logging
+import os
+from contextlib import contextmanager
+from importlib.metadata import entry_points
+
+from opentelemetry import context, trace
+from opentelemetry.sdk.resources import Resource
+from opentelemetry.sdk.trace import TracerProvider
+from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
+from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
+
+from airflow.configuration import conf
+
+log = logging.getLogger(__name__)
+
+OVERRIDE_SPAN_ID_KEY = context.create_key("override_span_id")
+OVERRIDE_TRACE_ID_KEY = context.create_key("override_trace_id")
+
+
+class OverrideableRandomIdGenerator(RandomIdGenerator):
+    """Lets you override the span id."""
+
+    def generate_span_id(self):
+        override = context.get_value(OVERRIDE_SPAN_ID_KEY)
+        if override is not None:
+            return override
+        return super().generate_span_id()
+
+    def generate_trace_id(self):
+        override = context.get_value(OVERRIDE_TRACE_ID_KEY)
+        if override is not None:
+            return override
+        return super().generate_trace_id()
+
+
+@contextmanager
+def override_ids(trace_id, span_id, ctx=None):
+    ctx = context.set_value(OVERRIDE_TRACE_ID_KEY, trace_id, context=ctx)
+    ctx = context.set_value(OVERRIDE_SPAN_ID_KEY, span_id, context=ctx)
+    token = context.attach(ctx)
+    try:
+        yield
+    finally:
+        context.detach(token)
+
+
+def _get_backcompat_config() -> tuple[str | None, Resource | None]:
+    """
+    Possibly get deprecated Airflow configs for otel.
+
+    Ideally we return (None, None) here.  But if the old configuration is 
there,
+    then we will use it.
+    """
+    resource = None
+    if not os.environ.get("OTEL_SERVICE_NAME") and not 
os.environ.get("OTEL_RESOURCE_ATTRIBUTES"):
+        service_name = conf.get("traces", "otel_service", fallback=None)
+        if service_name:
+            resource = Resource({"service.name": service_name})
+
+    endpoint = None
+    if not os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT"):

Review Comment:
   Users might also set `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` instead of 
`OTEL_EXPORTER_OTLP_ENDPOINT`.



##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1077,435 +488,36 @@ def test_same_scheduler_processing_the_entire_dag(
         log.info("out-start --\n%s\n-- out-end", out)
         log.info("err-start --\n%s\n-- err-end", err)
 
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans from the output.
-            check_spans_without_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_change_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler thread will be paused after the first task ends and a 
new scheduler process
-        will handle the rest of the dag processing. The paused thread will be 
resumed afterwards.
-        """
-
-        # For this test, scheduler1 must be idle but still considered healthy 
by scheduler2.
-        # If scheduler2 marks the job as unhealthy, then it will recreate 
scheduler1's spans
-        # because it will consider them lost.
-        os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = 
"90"
-
-        celery_worker_process = None
-        scheduler_process_1 = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            with capfd.disabled():
-                # When the scheduler1 thread is paused, capfd keeps trying to 
read the
-                # file descriptors for the process and ends up freezing the 
test.
-                # Temporarily disable capfd to avoid that.
-                scheduler_process_1.send_signal(signal.SIGSTOP)
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Start the 2nd scheduler immediately without any delay to avoid 
having the 1st scheduler
-            # marked as unhealthy. If that happens, then the 2nd will recreate 
the spans that the
-            # 1st scheduler started.
-            # The scheduler would also be considered unhealthy in case it was 
paused
-            # and the dag run continued running.
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.SHOULD_END
-            )
-
-            # Stop scheduler2 in case it still has a db lock on the dag_run.
-            scheduler_process_2.terminate()
-            scheduler_process_1.send_signal(signal.SIGCONT)
-
-            # Wait for the scheduler to start again and continue running.
-            time.sleep(10)
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=30, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Reset for the rest of the tests.
-            os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] 
= "15"
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            scheduler_process_1.terminate()
-            scheduler_process_1.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, 
is_recreated=False, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler that starts the dag run will be stopped, while the first 
task is executing,
-        and start a new scheduler will be started. That way, the new process 
will pick up the dag processing.
-        The initial scheduler will exit gracefully.
-        """
-
-        celery_worker_process = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_in_task"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Terminate scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.terminate()
-
-            assert scheduler_process_1.wait() == 0
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.NEEDS_CONTINUANCE
-            )
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_with_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_forcefully_in_the_middle_of_the_first_task(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The first scheduler will exit forcefully while the first task is 
running,
-        so that it won't have time end any active spans.
-        """
-
-        celery_worker_process = None
-        scheduler_process_2 = None
-        apiserver_process = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_in_task"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Kill scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.send_signal(signal.SIGKILL)
-
-            # The process shouldn't have changed the span_status.
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Wait so that the health threshold passes and scheduler1 is 
considered unhealthy.
-            time.sleep(15)
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Wait for scheduler2 to be up and running.
-            time.sleep(10)
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_without_continuance(output=out, dag=dag, 
is_recreated=True, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_forcefully_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The first scheduler will exit forcefully after the first task finishes,
-        so that it won't have time to end any active spans.
-        In this scenario, the sub-spans for the first task will be lost.
-        The only way to retrieve them, would be to re-run the task.
-        """
-
-        celery_worker_process = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            # Since, we are past the loop, then the file exists and the dag 
has been paused.
-            # Kill scheduler1 and start scheduler2.
-            with capfd.disabled():
-                scheduler_process_1.send_signal(signal.SIGKILL)
-
-            # The process shouldn't have changed the span_status.
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            time.sleep(15)
-            # The task should be adopted.
+        # host = "host.docker.internal"
+        host = "jaeger"
+        service_name = os.environ.get("OTEL_SERVICE_NAME", "test")
+        # service_name ``= "my-service-name"
+        r = 
requests.get(f"http://{host}:16686/api/traces?service={service_name}";)
+        data = r.json()
 
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
+        trace = data["data"][-1]
+        spans = trace["spans"]
 
-            apiserver_process.terminate()
-            apiserver_process.wait()
+        def get_span_hierarchy():
+            spans_dict = {x["spanID"]: x for x in spans}
 
-            scheduler_process_2.terminate()
-            scheduler_process_2.wait()
+            def get_parent_span_id(span):
+                parents = [x["spanID"] for x in span["references"] if 
x["refType"] == "CHILD_OF"]
+                if parents:
+                    parent_id = parents[0]
+                    return spans_dict[parent_id]["operationName"]
 
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
+            nested = {x["operationName"]: get_parent_span_id(x) for x in spans}
+            return nested
 
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, is_recreated=True, 
check_t1_sub_spans=False)
-
-    def start_worker_and_scheduler1(self):
-        celery_worker_process = subprocess.Popen(
-            self.celery_command_args,
-            env=os.environ.copy(),
-            stdout=None,
-            stderr=None,
-        )
+        nested = get_span_hierarchy()
+        assert nested == {
+            "sub_span1": "task_run.task1",
+            "task_run.task1": "dag_run.otel_test_dag",
+            "dag_run.otel_test_dag": None,
+        }
 
+    def start_worker_and_scheduler(self):

Review Comment:
   Maybe rename it to something else?



##########
airflow-core/src/airflow/executors/workloads/task.py:
##########
@@ -86,7 +86,7 @@ def make(
         from airflow.utils.helpers import log_filename_template_renderer
 
         ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
-        ser_ti.parent_context_carrier = ti.dag_run.context_carrier
+        ser_ti.context_carrier = ti.dag_run.context_carrier

Review Comment:
   Can you explain this change?



##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1077,435 +488,36 @@ def test_same_scheduler_processing_the_entire_dag(
         log.info("out-start --\n%s\n-- out-end", out)
         log.info("err-start --\n%s\n-- err-end", err)
 
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans from the output.
-            check_spans_without_continuance(output=out, dag=dag)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_change_after_the_first_task_finishes(
-        self, monkeypatch, celery_worker_env_vars, capfd, session
-    ):
-        """
-        The scheduler thread will be paused after the first task ends and a 
new scheduler process
-        will handle the rest of the dag processing. The paused thread will be 
resumed afterwards.
-        """
-
-        # For this test, scheduler1 must be idle but still considered healthy 
by scheduler2.
-        # If scheduler2 marks the job as unhealthy, then it will recreate 
scheduler1's spans
-        # because it will consider them lost.
-        os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = 
"90"
-
-        celery_worker_process = None
-        scheduler_process_1 = None
-        apiserver_process = None
-        scheduler_process_2 = None
-        try:
-            # Start the processes here and not as fixtures or in a common 
setup,
-            # so that the test can capture their output.
-            celery_worker_process, scheduler_process_1, apiserver_process = 
self.start_worker_and_scheduler1()
-
-            dag_id = "otel_test_dag_with_pause_between_tasks"
-            dag = self.dags[dag_id]
-
-            run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)
-
-            deadline = time.monotonic() + self.max_wait_seconds_for_pause
-
-            while True:
-                # To avoid get stuck waiting.
-                if time.monotonic() > deadline:
-                    raise TimeoutError(
-                        f"Timed out waiting for 'pause' to appear in 
{self.control_file}, after {self.max_wait_seconds_for_pause} seconds."
-                    )
-
-                try:
-                    with open(self.control_file) as file:
-                        file_contents = file.read()
-
-                        if "pause" in file_contents:
-                            log.info("Control file exists and the task has 
been paused.")
-                            break
-                        time.sleep(1)
-                        continue
-                except FileNotFoundError:
-                    print("Control file not found. Waiting...")
-                    time.sleep(3)
-                    continue
-
-            with capfd.disabled():
-                # When the scheduler1 thread is paused, capfd keeps trying to 
read the
-                # file descriptors for the process and ends up freezing the 
test.
-                # Temporarily disable capfd to avoid that.
-                scheduler_process_1.send_signal(signal.SIGSTOP)
-
-            check_dag_run_state_and_span_status(
-                dag_id=dag_id, run_id=run_id, state=State.RUNNING, 
span_status=SpanStatus.ACTIVE
-            )
-
-            # Start the 2nd scheduler immediately without any delay to avoid 
having the 1st scheduler
-            # marked as unhealthy. If that happens, then the 2nd will recreate 
the spans that the
-            # 1st scheduler started.
-            # The scheduler would also be considered unhealthy in case it was 
paused
-            # and the dag run continued running.
-
-            scheduler_process_2 = subprocess.Popen(
-                self.scheduler_command_args,
-                env=os.environ.copy(),
-                stdout=None,
-                stderr=None,
-            )
-
-            # Rewrite the file to unpause the dag.
-            with open(self.control_file, "w") as file:
-                file.write("continue")
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=120, 
span_status=SpanStatus.SHOULD_END
-            )
-
-            # Stop scheduler2 in case it still has a db lock on the dag_run.
-            scheduler_process_2.terminate()
-            scheduler_process_1.send_signal(signal.SIGCONT)
-
-            # Wait for the scheduler to start again and continue running.
-            time.sleep(10)
-
-            wait_for_dag_run_and_check_span_status(
-                dag_id=dag_id, run_id=run_id, max_wait_time=30, 
span_status=SpanStatus.ENDED
-            )
-
-            print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
-        finally:
-            if self.log_level == "debug":
-                with create_session() as session:
-                    dump_airflow_metadata_db(session)
-
-            # Reset for the rest of the tests.
-            os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] 
= "15"
-
-            # Terminate the processes.
-            celery_worker_process.terminate()
-            celery_worker_process.wait()
-
-            scheduler_process_1.terminate()
-            scheduler_process_1.wait()
-
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
-            scheduler_process_2.wait()
-
-        out, err = capfd.readouterr()
-        log.info("out-start --\n%s\n-- out-end", out)
-        log.info("err-start --\n%s\n-- err-end", err)
-
-        if self.use_otel != "true":
-            # Dag run should have succeeded. Test the spans in the output.
-            check_spans_for_paused_dag(output=out, dag=dag, 
is_recreated=False, check_t1_sub_spans=False)
-
-    @pytest.mark.execution_timeout(90)
-    def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(

Review Comment:
   If the scheduler changes, the dag run span will be fine because it's emitted 
at the end. What about the task span? Is it worth checking that?
   
   It's up to the worker to run the `task_runner`, right? Then, there are 2 
scenarios, whether the worker exited gracefully and ended the spans or not. If 
I remember correctly we already discussed this. I just want to verify.
   
   Also, is it worth testing what happens with the task spans once the worker 
exits gracefully? Does the new worker create a new span for the rest of the 
task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to