Copilot commented on code in PR #64576:
URL: https://github.com/apache/airflow/pull/64576#discussion_r3025337189
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -721,7 +703,6 @@ def _xcom_push(
task_id=ti.task_id,
run_id=ti.run_id,
map_index=ti.map_index,
- dag_result=ti.task.returns_dag_result,
_mapped_length=mapped_length,
)
Review Comment:
`_xcom_push()` no longer passes `dag_result` into `XCom.set()`. The
`SetXCom` comm message and `BaseXCom.set()` both still support `dag_result`,
and `DAG.add_result()` sets `operator.returns_dag_result = True`. Without
forwarding `dag_result=ti.task.returns_dag_result`, DAG result XComs will be
indistinguishable from normal return-value XComs.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -148,9 +146,9 @@ def _make_task_span(msg: StartupDetails):
TraceContextTextMapPropagator().extract(msg.ti.context_carrier) if
msg.ti.context_carrier else None
)
ti = msg.ti
- span_name = f"worker.{ti.task_id}"
+ span_name = f"task_run.{ti.task_id}"
if ti.map_index is not None and ti.map_index >= 0:
- span_name += f"[{ti.map_index}]"
+ span_name += f"_{ti.map_index}"
Review Comment:
`_make_task_span` was changed to emit spans named `task_run.<task_id>` (and
mapped as `_<map_index>`). This conflicts with the established OTel hierarchy
where the API server emits `task_run.*` spans and the task runner emits
`worker.*` spans (e.g. `airflow-core/tests/integration/otel/test_otel.py`
asserts `worker.task1` is a child of `task_run.task1`). Please revert this to
`worker.<task_id>` and keep the mapped format consistent with the rest of
tracing (`worker.<task_id>[<map_index>]`).
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -672,16 +670,6 @@ def get_dagrun_state(dag_id: str, run_id: str) -> str:
return response.state
Review Comment:
`RuntimeTaskInstance.get_dag()` was removed, but it is still part of the
public Task SDK runtime TI protocol (`task-sdk/src/airflow/sdk/types.py`
declares `@staticmethod def get_dag(dag_id: str) -> DagResult`). The comms
layer still defines `GetDag`/`DagResult` and the supervisor handles `GetDag`,
so removing this method is a breaking change for SDK consumers. Please restore
`get_dag()` (and related imports) or update the protocol + supervisor/message
types consistently if this removal is intentional.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -703,13 +691,7 @@ def mark_success_url(self) -> str:
return self.log_url
-def _xcom_push(
- ti: RuntimeTaskInstance,
- key: str,
- value: Any,
- *,
- mapped_length: int | None = None,
-) -> None:
+def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length:
int | None = None) -> None:
Review Comment:
Changing `_xcom_push` to accept `mapped_length` positionally (and removing
the keyword-only `*`) makes call sites less self-documenting and was previously
preventing accidental argument reordering. Since this is a helper used to
control an internal detail, it’s clearer/safer to keep `mapped_length`
keyword-only.
```suggestion
def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, *,
mapped_length: int | None = None) -> None:
```
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -3082,15 +3051,14 @@ def execute(self, context):
runtime_ti = create_runtime_ti(task=task)
with mock.patch.object(XCom, "set") as mock_xcom_set:
- _xcom_push(runtime_ti, BaseXCom.XCOM_RETURN_KEY, result,
mapped_length=7)
+ _xcom_push(runtime_ti, BaseXCom.XCOM_RETURN_KEY, result, 7)
Review Comment:
`_xcom_push(runtime_ti, ..., result, 7)` relies on `mapped_length` being
positional. If `_xcom_push` keeps `mapped_length` keyword-only (which helps
prevent call-site mistakes), this should stay as `mapped_length=7` for clarity.
```suggestion
_xcom_push(runtime_ti, BaseXCom.XCOM_RETURN_KEY, result,
mapped_length=7)
```
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -3082,15 +3051,14 @@ def execute(self, context):
runtime_ti = create_runtime_ti(task=task)
with mock.patch.object(XCom, "set") as mock_xcom_set:
- _xcom_push(runtime_ti, BaseXCom.XCOM_RETURN_KEY, result,
mapped_length=7)
+ _xcom_push(runtime_ti, BaseXCom.XCOM_RETURN_KEY, result, 7)
mock_xcom_set.assert_called_once_with(
key=BaseXCom.XCOM_RETURN_KEY,
value=result,
dag_id=runtime_ti.dag_id,
task_id=runtime_ti.task_id,
run_id=runtime_ti.run_id,
map_index=runtime_ti.map_index,
- dag_result=False,
_mapped_length=7,
Review Comment:
The expected `XCom.set(...)` call no longer includes `dag_result`. Since
`SetXCom`/`BaseXCom.set()` still support `dag_result` and DAG results depend on
it, this assertion should include
`dag_result=runtime_ti.task.returns_dag_result` (or the appropriate explicit
boolean) once `_xcom_push` forwards it again.
```suggestion
_mapped_length=7,
dag_result=runtime_ti.task.returns_dag_result,
```
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -463,45 +448,27 @@ def
test_task_span_is_child_of_dag_run_span(make_ti_context):
sentry_integration="",
)
- # Step 4: emit the worker span (task runner side).
- task_runner_tracer =
provider.get_tracer("airflow.sdk.execution_time.task_runner")
- with mock.patch("airflow.sdk.execution_time.task_runner.tracer",
task_runner_tracer):
- with _make_task_span(what):
- pass
+ task_tracer = provider.get_tracer("airflow.sdk.execution_time.task_runner")
+ with mock.patch("airflow.sdk.execution_time.task_runner.tracer",
task_tracer):
+ with _make_task_span(what) as span:
+ task_span_ctx = span.get_span_context()
- # Step 5: emit the parent task span (API server side, as happens on task
completion).
- mock_ti = mock.MagicMock()
- mock_ti.dag_id = "test_dag"
- mock_ti.task_id = "my_task"
- mock_ti.run_id = "test_run"
- mock_ti.try_number = 1
- mock_ti.map_index = -1
- mock_ti.queued_dttm = None
- mock_ti.start_date = timezone.utcnow()
- mock_ti.dag_run.context_carrier = dag_run_carrier
- mock_ti.context_carrier = ti_carrier
- api_tracer =
provider.get_tracer("airflow.api_fastapi.execution_api.routes.task_instances")
- with
mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.tracer",
api_tracer):
- _emit_task_span(mock_ti, TaskInstanceState.SUCCESS)
+ # The task span must share the dag run's trace ID.
+ assert task_span_ctx.trace_id == dag_run_span_ctx.trace_id
+ # The task span's parent must be the dag run span.
finished = in_mem_exporter.get_finished_spans()
-
- # task_run.my_task: emitted by API server, child of dag run, span_id ==
parent_task_span_ctx.span_id.
task_spans = [s for s in finished if s.name == "task_run.my_task"]
assert len(task_spans) == 1
- task_span = task_spans[0]
- assert task_span.parent is not None
- assert task_span.parent.span_id == dag_run_span_ctx.span_id
- assert task_span.context.span_id == parent_task_span_ctx.span_id
-
- # worker.my_task: created by task runner, child of the parent task span.
- worker_spans = [s for s in finished if s.name == "worker.my_task"]
- assert len(worker_spans) == 1
- assert worker_spans[0].parent is not None
- assert worker_spans[0].parent.span_id == parent_task_span_ctx.span_id
+ assert task_spans[0].parent is not None
+ assert task_spans[0].parent.span_id == dag_run_span_ctx.span_id
Review Comment:
This test now expects the task runner to emit a `task_run.*` span directly
parented by the `dag_run.*` span. In the current tracing design, the API server
emits the `task_run.*` span (using the span ID generated in
`new_task_run_carrier`), and the task runner should emit a `worker.*` span as a
child of that `task_run.*` span. Please update this test to validate the
intended hierarchy (`dag_run -> task_run -> worker`) and the expected span
names/parenting.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]