hussein-awala commented on code in PR #68568:
URL: https://github.com/apache/airflow/pull/68568#discussion_r3441918962
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -732,9 +732,10 @@ def __hash__(self):
@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
- return prune_dict(
- {"dag_id": self.dag_id, "task_id": self.task_id, "team_name":
getattr(self, "_team_name", None)}
- )
+ tags = self.dag_run.stats_tags
Review Comment:
Let's try to avoid this. Any new tag added unconditionally to `dag_run` will
also be added to the TI automatically, like `run_type`.
If you want to load the DAG tags, we may need to make `_dag_tags_for_stats`
public.
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -732,9 +732,10 @@ def __hash__(self):
@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
- return prune_dict(
- {"dag_id": self.dag_id, "task_id": self.task_id, "team_name":
getattr(self, "_team_name", None)}
- )
+ tags = self.dag_run.stats_tags
+ tags.pop("team_name", None)
+ base = prune_dict({"task_id": self.task_id, "team_name": getattr(self,
"_team_name", None)})
+ return {**tags, **base}
Review Comment:
Do we want to propagate `run_type` here?
If not, we should only select the tags we want to load from
`self.dag_run.stats_tags`.
If yes, we'll also need to add the same tag in
[task_runner.py](https://github.com/apache/airflow/pull/68568/changes#diff-5bef10ab2956abf7360dbf9b509b6e1113407874d24abcc1b276475051f13abf)
(will add a separate comment)
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -656,6 +686,12 @@ def get_running_dag_runs_to_examine(cls, session: Session)
-> ScalarResult[DagRu
.limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE)
)
+ # When dag tags are emitted as metric tags, eagerly load
dag_model.tags so stats_tags
+ # does not fire a per-DagRun N+1 lazy load in the scheduler loop. Off
by default, so the
+ # extra load is only paid when the feature is enabled.
+ if airflow_conf.getboolean("metrics", "dag_tags_in_metrics",
fallback=False):
+ query =
query.options(joinedload(cls.dag_model).selectinload(DagModel.tags))
+
Review Comment:
I was not sure about this, I checked with Claude and here is the analysis
result:
## Analysis: dag tags reach only a subset of metric emission sites
**Finding:** With `dag_tags_in_metrics` enabled, dag tags are silently
dropped on several emission paths. `_dag_tags_for_stats()` returns `{}` unless
`dag_run.dag_model.tags` is already loaded, and the only place that eager-loads
it is `get_running_dag_runs_to_examine`.
**Evidence:**
- Eager-load exists only here: `get_running_dag_runs_to_examine` →
`query.options(joinedload(cls.dag_model).selectinload(DagModel.tags))`.
- `_dag_tags_for_stats()` refuses to lazy-load: `if "dag_model" in
sa_inspect(self).unloaded: return {}`.
**Coverage:**
| Path | Metrics | Tags |
|---|---|---|
| Main scheduling loop (`_schedule_dag_run`) | `dagrun.duration.*`,
`first_task_scheduling_delay`, `dependency-check` | ✅ |
| Worker (`RuntimeTaskInstance.stats_tags`, in-memory dag) | `ti.start`,
`ti_successes`, `operator_successes`, `task.duration`, worker
`ti_failures`/`operator_failures` | ✅ |
| `_update_dag_run_state_for_paused_dags` | `update_state` metrics | ❌
(query joins `dag_model` but no `selectinload(tags)`) |
| `dag.test()` | `update_state` metrics | ❌ |
| DagRun creation/verify | `task_instance_created`, `task_restored_to_dag`,
`task_removed_from_dag` | ❌ |
| Server-side TI (`handle_failure`,
`_check_and_change_state_before_execution`) | `previously_succeeded`, scheduler
`ti_failures`/`operator_failures` | ❌ (`dag_run.dag_model.tags` never
eager-loaded; runs in a separate session so identity-map reuse doesn't help) |
**Impact:** The same metric name carries dag tags from one emitter and not
another (e.g. `ti_failures` tagged from the worker, untagged from the
scheduler) — easy to misread on a dashboard, and the feature under-delivers its
stated "all Dag-run and task-instance metrics" scope.
**Recommendation:** Either eager-load `dag_model.tags` (gated on the flag)
on these other paths, or document the limitation explicitly.
After I checked the `ti_failures` metric, I found it is emitted from the
worker (`task_runner.py`) when there is a normal failure, but sometimes it is
emitted from the scheduler when there is a problem with the worker
(`scheduler_job_runner.py` then `taskinstance.py`), where the metric will not
load the tags.
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -253,8 +254,13 @@ class RuntimeTaskInstance(TaskInstance):
@property
def stats_tags(self) -> dict[str, str]:
- """Metric tags for this task instance, including team_name when
available."""
- tags: dict[str, str] = {"dag_id": self.dag_id, "task_id": self.task_id}
+ """Metric tags for this task instance, including dag tags and
team_name when available."""
+ tags: dict[str, str] = {}
+ if conf.getboolean("metrics", "dag_tags_in_metrics", fallback=False):
+ tags.update(build_dag_metric_tags(self.task.dag.tags))
+ # Built-in keys always win on collision.
+ tags["dag_id"] = self.dag_id
+ tags["task_id"] = self.task_id
if self._ti_context_from_server and
self._ti_context_from_server.dag_run.team_name:
tags["team_name"] = self._ti_context_from_server.dag_run.team_name
return tags
Review Comment:
`run_type` tag is missing which can create inconsistency, check my comment
above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]