ashb commented on code in PR #68568:
URL: https://github.com/apache/airflow/pull/68568#discussion_r3435552794
##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -732,9 +734,29 @@ def __hash__(self):
@property
def stats_tags(self) -> dict[str, str]:
"""Returns task instance tags."""
- return prune_dict(
- {"dag_id": self.dag_id, "task_id": self.task_id, "team_name":
getattr(self, "_team_name", None)}
+ from sqlalchemy import inspect as sa_inspect
+
+ tags: dict[str, str] = {}
+ if conf.getboolean("metrics", "dag_tags_in_metrics", fallback=False):
+ try:
+ dr = self.dag_run # lazy="joined" — always in memory when TI
is loaded
+ if dr is not None and "dag_model" not in
sa_inspect(dr).unloaded:
+ dm = dr.dag_model
+ if dm is not None and "tags" not in
sa_inspect(dm).unloaded and dm.tags:
+ tags.update(build_dag_metric_tags(tag.name for tag in
dm.tags))
+ except SQLAlchemyError:
+ pass
+ # Built-in keys always win on collision.
+ tags.update(
+ prune_dict(
+ {
+ "dag_id": self.dag_id,
+ "task_id": self.task_id,
+ "team_name": getattr(self, "_team_name", None),
+ }
+ )
Review Comment:
Isn't all of this the same as `return self.dag_run.stats_tags`? If not why
not?
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -499,11 +500,29 @@ def check_version_id_exists_in_dr(self, dag_version_id:
UUID, *, session: Sessio
)
return session.scalar(select_stmt)
+ def _dag_tags_for_stats(self) -> dict[str, str]:
+ """Convert dag tags to metric tags. Tags with ':' become key:value;
others are standalone (empty value)."""
+ if not airflow_conf.getboolean("metrics", "dag_tags_in_metrics",
fallback=False):
+ return {}
+ try:
+ # dag_model is a lazy, view-only relationship; on a
detached/expired DagRun the
+ # load raises. Metric tagging must never break the caller, so
swallow that.
Review Comment:
This is the right behaviour, but have we ensured that dag_model is eagerly
loaded so that this actually works and the tags make it to metrics when run for
real?
##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -4323,7 +4323,8 @@ def test_stats_tags_without_team_name(self, dag_maker):
dr = dag_maker.create_dagrun()
tags = dr.stats_tags
assert "team_name" not in tags
- assert tags == {"dag_id": "test_dag", "run_type": "manual"}
+ assert tags["dag_id"] == "test_dag"
+ assert tags["run_type"] == "manual"
Review Comment:
Why aren't we testing/asserting the complete set of tags?
I'd probably suggest renaming `class TestDagRunStatsTagsTeamName` and
combining with a the new one to a single `class TestDagRunStatTags`
(Or better yet, delete these extra test classes that LLM agents love to add,
and move these all to top level test fns outside of class which is what we do
for _most_ of the tests in this file.)
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1249,6 +1249,17 @@ metrics:
type: boolean
example: ~
default: "False"
+ dag_tags_in_metrics:
+ description: |
+ Set to ``True`` to include Dag tags as metric tags on all Dag-run and
task-instance metrics.
+ Tags that contain a colon (e.g. ``env:prod``) are split into a
key/value pair. Plain tags
+ (e.g. ``production``) are emitted as standalone DogStatsd tags, or as
``tag=true`` in InfluxDB
+ line-protocol format. Disabled by default to avoid unexpected
cardinality increases, since Dag
+ tags are free-form, user-defined strings.
+ version_added: 3.3.0
Review Comment:
I think we've missed the cut for 3.3 unfortunately, right @vatsrahul1001?
```suggestion
version_added: 3.4.0
```
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -253,8 +254,14 @@ class RuntimeTaskInstance(TaskInstance):
@property
def stats_tags(self) -> dict[str, str]:
- """Metric tags for this task instance, including team_name when
available."""
- tags: dict[str, str] = {"dag_id": self.dag_id, "task_id": self.task_id}
+ """Metric tags for this task instance, including dag tags and
team_name when available."""
+ tags: dict[str, str] = {}
+ if conf.getboolean("metrics", "dag_tags_in_metrics", fallback=False):
+ dag = getattr(self.task, "dag", None)
+ tags.update(build_dag_metric_tags(getattr(dag, "tags", None) or
()))
Review Comment:
We are in the worker here, where the parsed dag exists. getattr should not
be needed.
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -6024,3 +6024,43 @@ def test_bad_declaration_is_skipped_not_fatal(self):
with patch("airflow.sdk.execution_time.task_runner.allow_class",
side_effect=ValueError("nope")):
# Must not raise -- the walk swallows per-class registration
errors.
_register_deserialization_allowed_classes(dag,
structlog.get_logger())
+
+
+class TestDagTagsForStats:
Review Comment:
Ditto -- no new test class please
##########
shared/observability/src/airflow_shared/observability/metrics/tag_utils.py:
##########
Review Comment:
Pete peeve of mind: utils files; they very quickly and easily become a
dumping ground for anything and everything.
Can we either not have "utils" in the name, or find a different existing
place to put this fn?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]