ashb commented on code in PR #49180:
URL: https://github.com/apache/airflow/pull/49180#discussion_r2059925068


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -873,11 +875,11 @@ def process_executor_events(
                 ti.pid,
             )
 
-            if (active_ti_span := cls.active_spans.get(ti.key)) is not None:
+            if (active_ti_span := cls.active_spans.get("ti:" + ti.id)) is not 
None:
                 cls.set_ti_span_attrs(span=active_ti_span, state=state, ti=ti)
                 # End the span and remove it from the active_spans dict.
                 active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
-                cls.active_spans.delete(ti.key)
+                cls.active_spans.delete("ti:" + ti.id)

Review Comment:
   Elsewhere you have `"ti:" + str(ti.id)` -- we should be consistent. I don't 
mind which way we go, but lets be consistent please



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
     @provide_session
     def _end_active_spans(self, session: Session = NEW_SESSION):
         # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
-        for key, span in self.active_spans.get_all().items():
-            from airflow.models.taskinstance import TaskInstanceKey
-
-            if isinstance(key, TaskInstanceKey):  # ti span.
-                # Can't compare the key directly because the try_number or the 
map_index might not be the same.
-                ti: TaskInstance = session.scalars(
-                    select(TaskInstance).where(
-                        TaskInstance.dag_id == key.dag_id,
-                        TaskInstance.task_id == key.task_id,
-                        TaskInstance.run_id == key.run_id,
-                    )
-                ).one()
-                if ti.state in State.finished:
-                    self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
-                    span.end(end_time=datetime_to_nano(ti.end_date))
-                    ti.span_status = SpanStatus.ENDED
-                else:
-                    span.end()
-                    ti.span_status = SpanStatus.NEEDS_CONTINUANCE
-            else:
-                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
+        for prefixed_key, span in self.active_spans.get_all().items():
+            # Use partition to split on the first occurrence of ':'.
+            prefix, sep, key = prefixed_key.partition(":")
+
+            if prefix == "ti":
+                ti: TaskInstance | None = session.scalars(
+                    select(TaskInstance).where(TaskInstance.id == key)
+                ).one_or_none()
+
+                if ti is not None:
+                    if ti.state in State.finished:
+                        self.set_ti_span_attrs(span=span, state=ti.state, 
ti=ti)
+                        span.end(end_time=datetime_to_nano(ti.end_date))
+                        ti.span_status = SpanStatus.ENDED
+                    else:
+                        span.end()
+                        ti.span_status = SpanStatus.NEEDS_CONTINUANCE
+            elif prefix == "dr":
+                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == str(key))).one()

Review Comment:
   `run_id` is only unique within a single DAG, it's not globally unique. This 
will either need to be DagRun.id, or we'll need to store the dag ID too)



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
     @provide_session
     def _end_active_spans(self, session: Session = NEW_SESSION):
         # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
-        for key, span in self.active_spans.get_all().items():
-            from airflow.models.taskinstance import TaskInstanceKey
-
-            if isinstance(key, TaskInstanceKey):  # ti span.
-                # Can't compare the key directly because the try_number or the 
map_index might not be the same.
-                ti: TaskInstance = session.scalars(
-                    select(TaskInstance).where(
-                        TaskInstance.dag_id == key.dag_id,
-                        TaskInstance.task_id == key.task_id,
-                        TaskInstance.run_id == key.run_id,
-                    )
-                ).one()
-                if ti.state in State.finished:
-                    self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
-                    span.end(end_time=datetime_to_nano(ti.end_date))
-                    ti.span_status = SpanStatus.ENDED
-                else:
-                    span.end()
-                    ti.span_status = SpanStatus.NEEDS_CONTINUANCE
-            else:
-                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
+        for prefixed_key, span in self.active_spans.get_all().items():
+            # Use partition to split on the first occurrence of ':'.
+            prefix, sep, key = prefixed_key.partition(":")
+
+            if prefix == "ti":
+                ti: TaskInstance | None = session.scalars(
+                    select(TaskInstance).where(TaskInstance.id == key)
+                ).one_or_none()

Review Comment:
   ```suggestion
                   ti: TaskInstance | None = session.get(TaskInstance, key)
   ```



##########
airflow-core/src/airflow/executors/workloads.py:
##########
@@ -69,7 +69,6 @@ class TaskInstance(BaseModel):
 
     parent_context_carrier: dict | None = None
     context_carrier: dict | None = None
-    queued_dttm: datetime | None = None

Review Comment:
   This looks like an un-intended change/a bad rebase?



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1265,6 +1265,13 @@ traces:
       type: string
       example: ~
       default: "False"
+    otel_debug_traces_on:
+      description: |
+        If True, then traces from Airflow internal methods are exported. 
Defaults to False.
+      version_added: 3.0.0

Review Comment:
   ```suggestion
         version_added: 3.1.0
   ```
   
   (Or maybe 3.0.1 -- we'll need to see)



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
     @provide_session
     def _end_active_spans(self, session: Session = NEW_SESSION):
         # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
-        for key, span in self.active_spans.get_all().items():
-            from airflow.models.taskinstance import TaskInstanceKey
-
-            if isinstance(key, TaskInstanceKey):  # ti span.
-                # Can't compare the key directly because the try_number or the 
map_index might not be the same.
-                ti: TaskInstance = session.scalars(
-                    select(TaskInstance).where(
-                        TaskInstance.dag_id == key.dag_id,
-                        TaskInstance.task_id == key.task_id,
-                        TaskInstance.run_id == key.run_id,
-                    )
-                ).one()
-                if ti.state in State.finished:
-                    self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
-                    span.end(end_time=datetime_to_nano(ti.end_date))
-                    ti.span_status = SpanStatus.ENDED
-                else:
-                    span.end()
-                    ti.span_status = SpanStatus.NEEDS_CONTINUANCE
-            else:
-                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
+        for prefixed_key, span in self.active_spans.get_all().items():
+            # Use partition to split on the first occurrence of ':'.
+            prefix, sep, key = prefixed_key.partition(":")

Review Comment:
   I wonder if instead of prefixing the string we should store the keys as 
tuples:
   
   `("ti", str(ti.id))`, `("dr", dr.id)` etc?
   
   Not much in it but maybe it makes things slightly clearer? (Though we'd have 
to be more careful of the type of the id we put in -- cos `("ti", UUID(...))` 
wouldn't match `("ti", "the_uuid")`



##########
airflow-core/src/airflow/traces/tracer.py:
##########
@@ -53,7 +53,7 @@ def add_span(func):
 
     @wraps(func)
     def wrapper(*args, **kwargs):
-        with Trace.start_span(span_name=func_name, component=component):
+        with DebugTrace.start_span(span_name=func_name, component=component):

Review Comment:
   Why is `add_span` always setting things as a DebugTrace? If it's only ever 
used for internal things I think we should rename this to `add_debug_span` to 
make it clearer



##########
airflow-core/tests/unit/core/test_otel_tracer.py:
##########
@@ -42,7 +42,8 @@ def name():
 class TestOtelTrace:
     def test_get_otel_tracer_from_trace_metaclass(self):
         """Test that `Trace.some_method()`, uses an `OtelTrace` instance when 
otel is configured."""
-        conf.add_section("traces")
+        if conf.getsection("traces") is None:
+            conf.add_section("traces")

Review Comment:
   ```suggestion
   ```
   
   Not needed anymore, `conf.set` now creates the section if needed.



##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -557,6 +557,7 @@ def print_ti_output_for_dag_run(dag_id: str, run_id: str):
 
 @pytest.mark.integration("redis")
 @pytest.mark.backend("postgres")
+@pytest.mark.timeout("240")  # 4 mins timeout for each test.

Review Comment:
   4m seems v long. Does it really take that long?



##########
airflow-core/tests/integration/otel/test_otel.py:
##########
@@ -1287,6 +1291,7 @@ def 
test_scheduler_exits_forcefully_in_the_middle_of_the_first_task(
             # Dag run should have succeeded. Test the spans in the output.
             check_spans_without_continuance(output=out, dag=dag, 
is_recreated=True, check_t1_sub_spans=False)
 
+    @pytest.mark.xfail(reason="Tests with a control file are flaky when 
running on the remote CI.")

Review Comment:
   What is the plan with these long term? Having them left as xfail adds very 
little and is there any point keeping them?



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1058,27 +1060,25 @@ def _update_dag_run_state_for_paused_dags(self, 
session: Session = NEW_SESSION)
     @provide_session
     def _end_active_spans(self, session: Session = NEW_SESSION):
         # No need to do a commit for every update. The annotation will commit 
all of them once at the end.
-        for key, span in self.active_spans.get_all().items():
-            from airflow.models.taskinstance import TaskInstanceKey
-
-            if isinstance(key, TaskInstanceKey):  # ti span.
-                # Can't compare the key directly because the try_number or the 
map_index might not be the same.
-                ti: TaskInstance = session.scalars(
-                    select(TaskInstance).where(
-                        TaskInstance.dag_id == key.dag_id,
-                        TaskInstance.task_id == key.task_id,
-                        TaskInstance.run_id == key.run_id,
-                    )
-                ).one()
-                if ti.state in State.finished:
-                    self.set_ti_span_attrs(span=span, state=ti.state, ti=ti)
-                    span.end(end_time=datetime_to_nano(ti.end_date))
-                    ti.span_status = SpanStatus.ENDED
-                else:
-                    span.end()
-                    ti.span_status = SpanStatus.NEEDS_CONTINUANCE
-            else:
-                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == key)).one()
+        for prefixed_key, span in self.active_spans.get_all().items():
+            # Use partition to split on the first occurrence of ':'.
+            prefix, sep, key = prefixed_key.partition(":")
+
+            if prefix == "ti":
+                ti: TaskInstance | None = session.scalars(
+                    select(TaskInstance).where(TaskInstance.id == key)
+                ).one_or_none()
+
+                if ti is not None:
+                    if ti.state in State.finished:
+                        self.set_ti_span_attrs(span=span, state=ti.state, 
ti=ti)
+                        span.end(end_time=datetime_to_nano(ti.end_date))
+                        ti.span_status = SpanStatus.ENDED
+                    else:
+                        span.end()
+                        ti.span_status = SpanStatus.NEEDS_CONTINUANCE
+            elif prefix == "dr":
+                dag_run: DagRun = 
session.scalars(select(DagRun).where(DagRun.run_id == str(key))).one()

Review Comment:
   Whiiich it turns out was a bug in the existing impl. Oops.



##########
pyproject.toml:
##########
@@ -847,6 +847,9 @@ addopts = [
     "--ignore-glob=**/tests/system/*",
     "--ignore-glob=tests/system/*",
 ]
+markers = [
+    "timeout: mark tests with a timeout (provided by pytest-timeout)",
+]

Review Comment:
   I'm surprised we needed to add this -- we have lots of tests with timeouts 
already... 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to