ashb commented on code in PR #68568:
URL: https://github.com/apache/airflow/pull/68568#discussion_r3435552794


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -732,9 +734,29 @@ def __hash__(self):
     @property
     def stats_tags(self) -> dict[str, str]:
         """Returns task instance tags."""
-        return prune_dict(
-            {"dag_id": self.dag_id, "task_id": self.task_id, "team_name": 
getattr(self, "_team_name", None)}
+        from sqlalchemy import inspect as sa_inspect
+
+        tags: dict[str, str] = {}
+        if conf.getboolean("metrics", "dag_tags_in_metrics", fallback=False):
+            try:
+                dr = self.dag_run  # lazy="joined" — always in memory when TI 
is loaded
+                if dr is not None and "dag_model" not in 
sa_inspect(dr).unloaded:
+                    dm = dr.dag_model
+                    if dm is not None and "tags" not in 
sa_inspect(dm).unloaded and dm.tags:
+                        tags.update(build_dag_metric_tags(tag.name for tag in 
dm.tags))
+            except SQLAlchemyError:
+                pass
+        # Built-in keys always win on collision.
+        tags.update(
+            prune_dict(
+                {
+                    "dag_id": self.dag_id,
+                    "task_id": self.task_id,
+                    "team_name": getattr(self, "_team_name", None),
+                }
+            )

Review Comment:
   Isn't all of this the same as `return self.dag_run.stats_tags`? If not why 
not?



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -499,11 +500,29 @@ def check_version_id_exists_in_dr(self, dag_version_id: 
UUID, *, session: Sessio
         )
         return session.scalar(select_stmt)
 
+    def _dag_tags_for_stats(self) -> dict[str, str]:
+        """Convert dag tags to metric tags. Tags with ':' become key:value; 
others are standalone (empty value)."""
+        if not airflow_conf.getboolean("metrics", "dag_tags_in_metrics", 
fallback=False):
+            return {}
+        try:
+            # dag_model is a lazy, view-only relationship; on a 
detached/expired DagRun the
+            # load raises. Metric tagging must never break the caller, so 
swallow that.

Review Comment:
   This is the right behaviour, but have we ensured that dag_model is eagerly 
loaded so that this actually works and the tags make it to metrics when run for 
real?



##########
airflow-core/tests/unit/models/test_dagrun.py:
##########
@@ -4323,7 +4323,8 @@ def test_stats_tags_without_team_name(self, dag_maker):
         dr = dag_maker.create_dagrun()
         tags = dr.stats_tags
         assert "team_name" not in tags
-        assert tags == {"dag_id": "test_dag", "run_type": "manual"}
+        assert tags["dag_id"] == "test_dag"
+        assert tags["run_type"] == "manual"

Review Comment:
   Why aren't we testing/asserting the complete set of tags?
   
   I'd probably suggest renaming `class TestDagRunStatsTagsTeamName` and 
combining with a the new one to a single `class TestDagRunStatTags`
   
   (Or better yet, delete these extra test classes that LLM agents love to add, 
and move these all to top level test fns outside of class which is what we do 
for _most_ of the tests in this file.)



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1249,6 +1249,17 @@ metrics:
       type: boolean
       example: ~
       default: "False"
+    dag_tags_in_metrics:
+      description: |
+        Set to ``True`` to include Dag tags as metric tags on all Dag-run and 
task-instance metrics.
+        Tags that contain a colon (e.g. ``env:prod``) are split into a 
key/value pair. Plain tags
+        (e.g. ``production``) are emitted as standalone DogStatsd tags, or as 
``tag=true`` in InfluxDB
+        line-protocol format. Disabled by default to avoid unexpected 
cardinality increases, since Dag
+        tags are free-form, user-defined strings.
+      version_added: 3.3.0

Review Comment:
   I think we've missed the cut for 3.3 unfortunately, right @vatsrahul1001?
   
   ```suggestion
         version_added: 3.4.0
   ```



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -253,8 +254,14 @@ class RuntimeTaskInstance(TaskInstance):
 
     @property
     def stats_tags(self) -> dict[str, str]:
-        """Metric tags for this task instance, including team_name when 
available."""
-        tags: dict[str, str] = {"dag_id": self.dag_id, "task_id": self.task_id}
+        """Metric tags for this task instance, including dag tags and 
team_name when available."""
+        tags: dict[str, str] = {}
+        if conf.getboolean("metrics", "dag_tags_in_metrics", fallback=False):
+            dag = getattr(self.task, "dag", None)
+            tags.update(build_dag_metric_tags(getattr(dag, "tags", None) or 
()))

Review Comment:
   We are in the worker here, where the parsed dag exists. getattr should not 
be needed.



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -6024,3 +6024,43 @@ def test_bad_declaration_is_skipped_not_fatal(self):
         with patch("airflow.sdk.execution_time.task_runner.allow_class", 
side_effect=ValueError("nope")):
             # Must not raise -- the walk swallows per-class registration 
errors.
             _register_deserialization_allowed_classes(dag, 
structlog.get_logger())
+
+
+class TestDagTagsForStats:

Review Comment:
   Ditto -- no new test class please



##########
shared/observability/src/airflow_shared/observability/metrics/tag_utils.py:
##########


Review Comment:
   Pete peeve of mind: utils files; they very quickly and easily become a 
dumping ground for anything and everything.
   
   Can we either not have "utils" in the name, or find a different existing 
place to put this fn?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to