This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e1dace0fd [OTel Integration] Add tagging to existing stats (#30496)
7e1dace0fd is described below

commit 7e1dace0fd3eed2bbec2ae1e55ac586c9e3dd8cb
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Apr 17 17:58:46 2023 -0700

    [OTel Integration] Add tagging to existing stats (#30496)
---
 airflow/dag_processing/processor.py         |  6 ++---
 airflow/executors/base_executor.py          | 16 +++++++++---
 airflow/jobs/scheduler_job_runner.py        |  6 ++---
 airflow/models/dag.py                       |  4 +--
 airflow/models/dagrun.py                    | 32 ++++++++++++-----------
 airflow/models/taskinstance.py              | 39 +++++++++++++++--------------
 airflow/stats.py                            | 13 +++++-----
 tests/dag_processing/test_processor.py      | 27 +++++++++-----------
 tests/executors/test_base_executor.py       |  8 +++---
 tests/executors/test_celery_executor.py     | 12 ++++++---
 tests/executors/test_kubernetes_executor.py | 16 +++++++++---
 tests/executors/test_local_executor.py      | 12 ++++++---
 tests/executors/test_sequential_executor.py | 16 +++++++++---
 tests/jobs/test_scheduler_job.py            | 22 ++++++++--------
 tests/models/test_dag.py                    |  2 +-
 tests/models/test_dagrun.py                 | 13 +++++-----
 tests/models/test_taskinstance.py           | 26 +++++++++++++------
 17 files changed, 159 insertions(+), 111 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 442431afb5..c6f07af0b6 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -459,9 +459,7 @@ class DagFileProcessor(LoggingMixin):
                         timestamp=ts,
                     )
                     sla_misses.append(sla_miss)
-                    Stats.incr(
-                        "sla_missed", tags={"dag_id": ti.dag_id, "run_id": 
ti.run_id, "task_id": ti.task_id}
-                    )
+                    Stats.incr("sla_missed", tags={"dag_id": ti.dag_id, 
"task_id": ti.task_id})
             if sla_misses:
                 session.add_all(sla_misses)
         session.commit()
@@ -766,7 +764,7 @@ class DagFileProcessor(LoggingMixin):
             return DagBag(file_path, include_examples=False)
         except Exception:
             cls.logger().exception("Failed at reloading the DAG file %s", 
file_path)
-            Stats.incr("dag_file_refresh_error", 1, 1)
+            Stats.incr("dag_file_refresh_error", tags={"file_path": file_path})
             raise
 
     @provide_session
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index e49938c458..4f9ac0c4f6 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -217,9 +217,19 @@ class BaseExecutor(LoggingMixin):
         self.log.debug("%s in queue", num_queued_tasks)
         self.log.debug("%s open slots", open_slots)
 
-        Stats.gauge("executor.open_slots", open_slots)
-        Stats.gauge("executor.queued_tasks", num_queued_tasks)
-        Stats.gauge("executor.running_tasks", num_running_tasks)
+        Stats.gauge(
+            "executor.open_slots", value=open_slots, tags={"status": "open", 
"name": self.__class__.__name__}
+        )
+        Stats.gauge(
+            "executor.queued_tasks",
+            value=num_queued_tasks,
+            tags={"status": "queued", "name": self.__class__.__name__},
+        )
+        Stats.gauge(
+            "executor.running_tasks",
+            value=num_running_tasks,
+            tags={"status": "running", "name": self.__class__.__name__},
+        )
 
         self.trigger_tasks(open_slots)
 
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index bfaefa2822..0f750e30c1 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -784,7 +784,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             if ti_queued and not ti_requeued:
                 Stats.incr(
                     "scheduler.tasks.killed_externally",
-                    tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id": 
ti.task_id},
+                    tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
                 )
                 msg = (
                     "Executor reports task instance %s finished (%s) although 
the "
@@ -1717,9 +1717,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             )
             self.log.error("Detected zombie job: %s", request)
             self.job.executor.send_callback(request)
-            Stats.incr(
-                "zombies_killed", tags={"dag_id": ti.dag_id, "run_id": 
ti.run_id, "task_id": ti.task_id}
-            )
+            Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": 
ti.task_id})
 
     @staticmethod
     def _generate_zombie_message_details(ti: TI):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9888e5cbd8..89becc04c7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1341,9 +1341,7 @@ class DAG(LoggingMixin):
                     callback(context)
                 except Exception:
                     self.log.exception("failed to invoke dag state update 
callback")
-                    Stats.incr(
-                        "dag.callback_exceptions", tags={"dag_id": 
dagrun.dag_id, "run_id": dagrun.run_id}
-                    )
+                    Stats.incr("dag.callback_exceptions", tags={"dag_id": 
dagrun.dag_id})
 
     def get_active_runs(self):
         """
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index edb0ec78ac..427cf41c88 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -61,7 +61,7 @@ from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
 from airflow.typing_compat import Literal
 from airflow.utils import timezone
-from airflow.utils.helpers import is_container
+from airflow.utils.helpers import is_container, prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, 
tuple_in_condition, with_row_locks
@@ -233,6 +233,10 @@ class DagRun(Base, LoggingMixin):
             external_trigger=self.external_trigger,
         )
 
+    @property
+    def stats_tags(self) -> dict[str, str]:
+        return prune_dict({"dag_id": self.dag_id, "run_type": self.run_type})
+
     @property
     def logical_date(self) -> datetime:
         return self.execution_date
@@ -562,7 +566,10 @@ class DagRun(Base, LoggingMixin):
 
         start_dttm = timezone.utcnow()
         self.last_scheduling_decision = start_dttm
-        with Stats.timer(f"dagrun.dependency-check.{self.dag_id}"):
+        with Stats.timer(
+            f"dagrun.dependency-check.{self.dag_id}",
+            tags=self.stats_tags,
+        ):
             dag = self.get_dag()
             info = self.task_instance_scheduling_decisions(session)
 
@@ -894,12 +901,10 @@ class DagRun(Base, LoggingMixin):
                 data_interval_end = dag.get_run_data_interval(self).end
                 true_delay = first_start_date - data_interval_end
                 if true_delay.total_seconds() > 0:
-                    
Stats.timing(f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay)
                     Stats.timing(
-                        "dagrun.first_task_scheduling_delay",
-                        true_delay,
-                        tags={"dag_id": dag.dag_id},
+                        f"dagrun.{dag.dag_id}.first_task_scheduling_delay", 
true_delay, tags=self.stats_tags
                     )
+                    Stats.timing("dagrun.first_task_scheduling_delay", 
true_delay, tags=self.stats_tags)
         except Exception:
             self.log.warning("Failed to record first_task_scheduling_delay 
metric:", exc_info=True)
 
@@ -914,12 +919,9 @@ class DagRun(Base, LoggingMixin):
             return
 
         duration = self.end_date - self.start_date
-        if self.state == State.SUCCESS:
-            Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
-            Stats.timing("dagrun.duration.success", duration, tags={"dag_id": 
self.dag_id})
-        elif self.state == State.FAILED:
-            Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)
-            Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": 
self.dag_id})
+        timer_params = {"dt": duration, "tags": self.stats_tags}
+        Stats.timing(f"dagrun.duration.{self.state.value}.{self.dag_id}", 
**timer_params)
+        Stats.timing(f"dagrun.duration.{self.state.value}", **timer_params)
 
     @provide_session
     def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
@@ -982,14 +984,14 @@ class DagRun(Base, LoggingMixin):
                 should_restore_task = (task is not None) and ti.state == 
State.REMOVED
                 if should_restore_task:
                     self.log.info("Restoring task '%s' which was previously 
removed from DAG '%s'", ti, dag)
-                    Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+                    Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 
tags=self.stats_tags)
                     ti.state = State.NONE
             except AirflowException:
                 if ti.state == State.REMOVED:
                     pass  # ti has already been removed, just ignore it
                 elif self.state != State.RUNNING and not dag.partial:
                     self.log.warning("Failed to get task '%s' for dag '%s'. 
Marking it as removed.", ti, dag)
-                    Stats.incr(f"task_removed_from_dag.{dag.dag_id}", 1, 1)
+                    Stats.incr(f"task_removed_from_dag.{dag.dag_id}", 
tags=self.stats_tags)
                     ti.state = State.REMOVED
                 continue
 
@@ -1145,7 +1147,7 @@ class DagRun(Base, LoggingMixin):
                 session.bulk_save_objects(tasks)
 
             for task_type, count in created_counts.items():
-                Stats.incr(f"task_instance_created-{task_type}", count)
+                Stats.incr(f"task_instance_created-{task_type}", count, 
tags=self.stats_tags)
             session.flush()
         except IntegrityError:
             self.log.info(
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d4e37741c3..4039f72bb7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -106,7 +106,7 @@ from airflow.typing_compat import Literal, TypeGuard
 from airflow.utils import timezone
 from airflow.utils.context import ConnectionAccessor, Context, 
VariableAccessor, context_merge
 from airflow.utils.email import send_email
-from airflow.utils.helpers import render_template_to_string
+from airflow.utils.helpers import prune_dict, render_template_to_string
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import qualname
 from airflow.utils.net import get_hostname
@@ -543,6 +543,10 @@ class TaskInstance(Base, LoggingMixin):
         # can be changed when calling 'run'
         self.test_mode = False
 
+    @property
+    def stats_tags(self) -> dict[str, str]:
+        return prune_dict({"dag_id": self.dag_id, "task_id": self.task_id})
+
     @staticmethod
     def insert_mapping(run_id: str, task: Operator, map_index: int) -> 
dict[str, Any]:
         """:meta private:"""
@@ -1262,12 +1266,7 @@ class TaskInstance(Base, LoggingMixin):
         self.pid = None
 
         if not ignore_all_deps and not ignore_ti_state and self.state == 
State.SUCCESS:
-            Stats.incr(
-                "previously_succeeded",
-                1,
-                1,
-                tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id": 
self.task_id},
-            )
+            Stats.incr("previously_succeeded", tags=self.stats_tags)
 
         if not mark_success:
             # Firstly find non-runnable and non-requeueable tis.
@@ -1412,10 +1411,14 @@ class TaskInstance(Base, LoggingMixin):
             session.merge(self)
             session.commit()
         actual_start_date = timezone.utcnow()
-        Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}")
+        Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}", 
tags=self.stats_tags)
         # Initialize final state counters at zero
         for state in State.task_states:
-            
Stats.incr(f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}", count=0)
+            Stats.incr(
+                f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}",
+                count=0,
+                tags=self.stats_tags,
+            )
 
         self.task = self.task.prepare_for_execution()
         context = self.get_template_context(ignore_param_exceptions=False)
@@ -1485,7 +1488,7 @@ class TaskInstance(Base, LoggingMixin):
             session.commit()
             raise
         finally:
-            Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}")
+            Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}", 
tags=self.stats_tags)
 
         # Recording SKIPPED or SUCCESS
         self.clear_next_method_args()
@@ -1547,7 +1550,7 @@ class TaskInstance(Base, LoggingMixin):
         if not self.next_method:
             self.clear_xcom_data()
 
-        with 
Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"):
+        with 
Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration", 
tags=self.stats_tags):
             # Set the validated/merged params on the task object.
             self.task.params = context["params"]
 
@@ -1582,10 +1585,8 @@ class TaskInstance(Base, LoggingMixin):
             # Run post_execute callback
             self.task.post_execute(context=context, result=result)
 
-        Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1)
-        Stats.incr(
-            "ti_successes", tags={"dag_id": self.dag_id, "run_id": 
self.run_id, "task_id": self.task_id}
-        )
+        Stats.incr(f"operator_successes_{self.task.task_type}", 
tags=self.stats_tags)
+        Stats.incr("ti_successes", tags=self.stats_tags)
 
     def _run_finished_callback(
         self,
@@ -1852,10 +1853,10 @@ class TaskInstance(Base, LoggingMixin):
 
         self.end_date = timezone.utcnow()
         self.set_duration()
-        Stats.incr(f"operator_failures_{self.operator}")
-        Stats.incr(
-            "ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id, 
"task_id": self.task_id}
-        )
+
+        Stats.incr(f"operator_failures_{self.operator}", tags=self.stats_tags)
+        Stats.incr("ti_failures", tags=self.stats_tags)
+
         if not test_mode:
             session.add(Log(State.FAILED, self))
 
diff --git a/airflow/stats.py b/airflow/stats.py
index 8785569682..da31e7d913 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -24,7 +24,7 @@ import socket
 import string
 import time
 from functools import partial, wraps
-from typing import TYPE_CHECKING, Callable, Iterable, TypeVar, cast
+from typing import TYPE_CHECKING, Callable, Iterable, TypeVar, Union, cast
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, 
InvalidStatsNameException
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
     from statsd import StatsClient
 
 log = logging.getLogger(__name__)
+DeltaType = Union[int, float, datetime.timedelta]
 
 
 class TimerProtocol(Protocol):
@@ -96,7 +97,7 @@ class StatsLogger(Protocol):
     def timing(
         cls,
         stat: str,
-        dt: int | float | datetime.timedelta,
+        dt: DeltaType | None,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
@@ -333,9 +334,7 @@ class NoStatsLogger:
         """Gauge stat."""
 
     @classmethod
-    def timing(
-        cls, stat: str, dt: int | float | datetime.timedelta, *, tags: 
dict[str, str] | None = None
-    ) -> None:
+    def timing(cls, stat: str, dt: DeltaType, *, tags: dict[str, str] | None = 
None) -> None:
         """Stats timing."""
 
     @classmethod
@@ -410,7 +409,7 @@ class SafeStatsdLogger:
     def timing(
         self,
         stat: str,
-        dt: int | float | datetime.timedelta,
+        dt: DeltaType,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
@@ -514,7 +513,7 @@ class SafeDogStatsdLogger:
     def timing(
         self,
         stat: str,
-        dt: int | float | datetime.timedelta,
+        dt: DeltaType,
         *,
         tags: dict[str, str] | None = None,
     ) -> None:
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index a574dc88a5..11e681b209 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -248,9 +248,7 @@ class TestDagFileProcessor:
             .count()
         )
         assert sla_miss_count == 1
-        mock_stats_incr.assert_called_with(
-            "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", 
"task_id": "dummy"}
-        )
+        mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id": 
"test_sla_miss", "task_id": "dummy"})
         # Now call manage_slas and see that it runs without errors
         # because of existing SlaMiss above.
         # Since this is run often, it's possible that it runs before another
@@ -300,9 +298,7 @@ class TestDagFileProcessor:
             .count()
         )
         assert sla_miss_count == 2
-        mock_stats_incr.assert_called_with(
-            "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test", 
"task_id": "dummy"}
-        )
+        mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id": 
"test_sla_miss", "task_id": "dummy"})
 
     @patch.object(DagFileProcessor, "logger")
     @mock.patch("airflow.dag_processing.processor.Stats.incr")
@@ -404,15 +400,18 @@ class TestDagFileProcessor:
         sending an email
         """
         session = settings.Session()
+        dag_id = "test_sla_miss"
+        task_id = "test_ti"
+        email = "[email protected]"
 
         # Mock the callback function so we can verify that it was not called
         mock_send_email.side_effect = RuntimeError("Could not send an email")
 
         test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
         dag, task = create_dummy_dag(
-            dag_id="test_sla_miss",
-            task_id="dummy",
-            email="[email protected]",
+            dag_id=dag_id,
+            task_id=task_id,
+            email=email,
             default_args={"start_date": test_start_date, "sla": 
datetime.timedelta(hours=1)},
         )
         mock_stats_incr.reset_mock()
@@ -420,7 +419,7 @@ class TestDagFileProcessor:
         session.merge(TaskInstance(task=task, execution_date=test_start_date, 
state="Success"))
 
         # Create an SlaMiss where notification was sent, but email was not
-        session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss", 
execution_date=test_start_date))
+        session.merge(SlaMiss(task_id=task_id, dag_id=dag_id, 
execution_date=test_start_date))
 
         mock_log = mock.Mock()
         mock_get_log.return_value = mock_log
@@ -428,13 +427,11 @@ class TestDagFileProcessor:
         mock_dagbag.get_dag.return_value = dag
         mock_get_dagbag.return_value = mock_dagbag
 
-        DagFileProcessor.manage_slas(dag_folder=dag.fileloc, 
dag_id="test_sla_miss", session=session)
+        DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id=dag_id, 
session=session)
         mock_log.exception.assert_called_once_with(
-            "Could not send SLA Miss email notification for DAG %s", 
"test_sla_miss"
-        )
-        mock_stats_incr.assert_called_once_with(
-            "sla_email_notification_failure", tags={"dag_id": "test_sla_miss"}
+            "Could not send SLA Miss email notification for DAG %s", dag_id
         )
+        
mock_stats_incr.assert_called_once_with("sla_email_notification_failure", 
tags={"dag_id": dag_id})
 
     
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
     def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, 
create_dummy_dag):
diff --git a/tests/executors/test_base_executor.py 
b/tests/executors/test_base_executor.py
index 351fb614f4..b2059ae996 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -87,9 +87,11 @@ def test_gauge_executor_metrics(mock_stats_gauge, 
mock_trigger_tasks, mock_sync)
     executor = BaseExecutor()
     executor.heartbeat()
     calls = [
-        mock.call("executor.open_slots", mock.ANY),
-        mock.call("executor.queued_tasks", mock.ANY),
-        mock.call("executor.running_tasks", mock.ANY),
+        mock.call("executor.open_slots", value=mock.ANY, tags={"status": 
"open", "name": "BaseExecutor"}),
+        mock.call("executor.queued_tasks", value=mock.ANY, tags={"status": 
"queued", "name": "BaseExecutor"}),
+        mock.call(
+            "executor.running_tasks", value=mock.ANY, tags={"status": 
"running", "name": "BaseExecutor"}
+        ),
     ]
     mock_stats_gauge.assert_has_calls(calls)
 
diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
index b24023264e..b2ea104be7 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -124,9 +124,15 @@ class TestCeleryExecutor:
         executor = celery_executor.CeleryExecutor()
         executor.heartbeat()
         calls = [
-            mock.call("executor.open_slots", mock.ANY),
-            mock.call("executor.queued_tasks", mock.ANY),
-            mock.call("executor.running_tasks", mock.ANY),
+            mock.call(
+                "executor.open_slots", value=mock.ANY, tags={"status": "open", 
"name": "CeleryExecutor"}
+            ),
+            mock.call(
+                "executor.queued_tasks", value=mock.ANY, tags={"status": 
"queued", "name": "CeleryExecutor"}
+            ),
+            mock.call(
+                "executor.running_tasks", value=mock.ANY, tags={"status": 
"running", "name": "CeleryExecutor"}
+            ),
         ]
         mock_stats_gauge.assert_has_calls(calls)
 
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index ea261b287c..c2b7d24c55 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -396,9 +396,19 @@ class TestKubernetesExecutor:
         executor = self.kubernetes_executor
         executor.heartbeat()
         calls = [
-            mock.call("executor.open_slots", mock.ANY),
-            mock.call("executor.queued_tasks", mock.ANY),
-            mock.call("executor.running_tasks", mock.ANY),
+            mock.call(
+                "executor.open_slots", value=mock.ANY, tags={"status": "open", 
"name": "KubernetesExecutor"}
+            ),
+            mock.call(
+                "executor.queued_tasks",
+                value=mock.ANY,
+                tags={"status": "queued", "name": "KubernetesExecutor"},
+            ),
+            mock.call(
+                "executor.running_tasks",
+                value=mock.ANY,
+                tags={"status": "running", "name": "KubernetesExecutor"},
+            ),
         ]
         mock_stats_gauge.assert_has_calls(calls)
 
diff --git a/tests/executors/test_local_executor.py 
b/tests/executors/test_local_executor.py
index d292d20154..a69e158612 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -132,8 +132,14 @@ class TestLocalExecutor:
         executor = LocalExecutor()
         executor.heartbeat()
         calls = [
-            mock.call("executor.open_slots", mock.ANY),
-            mock.call("executor.queued_tasks", mock.ANY),
-            mock.call("executor.running_tasks", mock.ANY),
+            mock.call(
+                "executor.open_slots", value=mock.ANY, tags={"status": "open", 
"name": "LocalExecutor"}
+            ),
+            mock.call(
+                "executor.queued_tasks", value=mock.ANY, tags={"status": 
"queued", "name": "LocalExecutor"}
+            ),
+            mock.call(
+                "executor.running_tasks", value=mock.ANY, tags={"status": 
"running", "name": "LocalExecutor"}
+            ),
         ]
         mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/executors/test_sequential_executor.py 
b/tests/executors/test_sequential_executor.py
index 94bea51857..46cf768a25 100644
--- a/tests/executors/test_sequential_executor.py
+++ b/tests/executors/test_sequential_executor.py
@@ -48,8 +48,18 @@ class TestSequentialExecutor:
         executor = SequentialExecutor()
         executor.heartbeat()
         calls = [
-            mock.call("executor.open_slots", mock.ANY),
-            mock.call("executor.queued_tasks", mock.ANY),
-            mock.call("executor.running_tasks", mock.ANY),
+            mock.call(
+                "executor.open_slots", value=mock.ANY, tags={"status": "open", 
"name": "SequentialExecutor"}
+            ),
+            mock.call(
+                "executor.queued_tasks",
+                value=mock.ANY,
+                tags={"status": "queued", "name": "SequentialExecutor"},
+            ),
+            mock.call(
+                "executor.running_tasks",
+                value=mock.ANY,
+                tags={"status": "running", "name": "SequentialExecutor"},
+            ),
         ]
         mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3595623260..5be7612790 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -269,12 +269,10 @@ class TestSchedulerJob:
             [
                 mock.call(
                     "scheduler.tasks.killed_externally",
-                    tags={"dag_id": dag_id, "run_id": ti1.run_id, "task_id": 
ti1.task_id},
-                ),
-                mock.call("operator_failures_EmptyOperator"),
-                mock.call(
-                    "ti_failures", tags={"dag_id": dag_id, "run_id": 
ti1.run_id, "task_id": ti1.task_id}
+                    tags={"dag_id": dag_id, "task_id": ti1.task_id},
                 ),
+                mock.call("operator_failures_EmptyOperator", tags={"dag_id": 
dag_id, "task_id": ti1.task_id}),
+                mock.call("ti_failures", tags={"dag_id": dag_id, "task_id": 
ti1.task_id}),
             ],
             any_order=True,
         )
@@ -283,7 +281,8 @@ class TestSchedulerJob:
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
     def test_process_executor_events_with_no_callback(self, mock_stats_incr, 
mock_task_callback, dag_maker):
         dag_id = "test_process_executor_events_with_no_callback"
-        task_id_1 = "dummy_task"
+        task_id = "test_task"
+        run_id = "test_run"
 
         mock_stats_incr.reset_mock()
         executor = MockExecutor(do_update=False)
@@ -295,9 +294,9 @@ class TestSchedulerJob:
 
         session = settings.Session()
         with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
-            task1 = EmptyOperator(task_id=task_id_1, retries=1)
+            task1 = EmptyOperator(task_id=task_id, retries=1)
         ti1 = dag_maker.create_dagrun(
-            run_id="dr2", execution_date=DEFAULT_DATE + timedelta(hours=1)
+            run_id=run_id, execution_date=DEFAULT_DATE + timedelta(hours=1)
         ).get_task_instance(task1.task_id)
 
         mock_stats_incr.reset_mock()
@@ -333,10 +332,10 @@ class TestSchedulerJob:
             [
                 mock.call(
                     "scheduler.tasks.killed_externally",
-                    tags={"dag_id": dag_id, "run_id": "dr2", "task_id": 
task_id_1},
+                    tags={"dag_id": dag_id, "task_id": task_id},
                 ),
-                mock.call("operator_failures_EmptyOperator"),
-                mock.call("ti_failures", tags={"dag_id": dag_id, "run_id": 
"dr2", "task_id": task_id_1}),
+                mock.call("operator_failures_EmptyOperator", tags={"dag_id": 
dag_id, "task_id": task_id}),
+                mock.call("ti_failures", tags={"dag_id": dag_id, "task_id": 
task_id}),
             ],
             any_order=True,
         )
@@ -387,7 +386,6 @@ class TestSchedulerJob:
             "scheduler.tasks.killed_externally",
             tags={
                 "dag_id": "test_process_executor_events_with_callback",
-                "run_id": "test",
                 "task_id": "dummy_task",
             },
         )
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index b4ff9a0bed..d9f269bca6 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1482,7 +1482,7 @@ class TestDag:
 
         mock_stats.incr.assert_called_with(
             "dag.callback_exceptions",
-            tags={"dag_id": "test_dag_callback_crash", "run_id": 
"manual__2015-01-02T00:00:00+00:00"},
+            tags={"dag_id": "test_dag_callback_crash"},
         )
 
         dag.clear()
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5ffbdaae02..a053fc6ef8 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -854,6 +854,7 @@ class TestDagRun:
         """
         dag = DAG(dag_id="test_emit_dag_stats", start_date=DEFAULT_DATE, 
schedule=schedule_interval)
         dag_task = EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
+        expected_stat_tags = {"dag_id": f"{dag.dag_id}", "run_type": 
DagRunType.SCHEDULED}
 
         try:
             info = dag.next_dagrun_info(None)
@@ -888,12 +889,9 @@ class TestDagRun:
 
             if expected:
                 true_delay = ti.start_date - dag_run.data_interval_end
-                sched_delay_stat_call = call(
-                    metric_name,
-                    true_delay,
-                )
+                sched_delay_stat_call = call(metric_name, true_delay, 
tags=expected_stat_tags)
                 sched_delay_stat_call_with_tags = call(
-                    "dagrun.first_task_scheduling_delay", true_delay, 
tags={"dag_id": f"{dag.dag_id}"}
+                    "dagrun.first_task_scheduling_delay", true_delay, 
tags=expected_stat_tags
                 )
                 assert (
                     sched_delay_stat_call in stats_mock.mock_calls
@@ -940,6 +938,7 @@ class TestDagRun:
 @mock.patch.object(Stats, "incr")
 def test_verify_integrity_task_start_and_end_date(Stats_incr, session, 
run_type, expected_tis):
     """Test that tasks with specific dates are only created for backfill 
runs"""
+
     with DAG("test", start_date=DEFAULT_DATE) as dag:
         EmptyOperator(task_id="without")
         EmptyOperator(task_id="with_start_date", start_date=DEFAULT_DATE + 
datetime.timedelta(1))
@@ -960,7 +959,9 @@ def 
test_verify_integrity_task_start_and_end_date(Stats_incr, session, run_type,
     tis = dag_run.task_instances
     assert len(tis) == expected_tis
 
-    Stats_incr.assert_called_with("task_instance_created-EmptyOperator", 
expected_tis)
+    Stats_incr.assert_called_with(
+        "task_instance_created-EmptyOperator", expected_tis, tags={"dag_id": 
"test", "run_type": run_type}
+    )
 
 
 @pytest.mark.parametrize("is_noop", [True, False])
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index fb91982c80..098a0acd04 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2529,6 +2529,7 @@ class TestTaskInstance:
         ti.task = None
         ti.state = State.QUEUED
         session.flush()
+        expected_stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
 
         assert ti.task is None, "Check critical pre-condition"
 
@@ -2542,10 +2543,8 @@ class TestTaskInstance:
         # Check 'ti.try_number' is bumped to 2. This is try_number for next run
         assert ti.try_number == 2
 
-        Stats_incr.assert_any_call(
-            "ti_failures", tags={"dag_id": ti.dag_id, "run_id": ti.run_id, 
"task_id": ti.task_id}
-        )
-        Stats_incr.assert_any_call("operator_failures_EmptyOperator")
+        Stats_incr.assert_any_call("ti_failures", tags=expected_stats_tags)
+        Stats_incr.assert_any_call("operator_failures_EmptyOperator", 
tags=expected_stats_tags)
 
     def test_handle_failure_task_undefined(self, create_task_instance):
         """
@@ -2654,10 +2653,23 @@ class TestTaskInstance:
         session.commit()
         ti._run_raw_task()
         ti.refresh_from_db()
-        
stats_mock.assert_called_with(f"ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}")
+        stats_mock.assert_called_with(
+            f"ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}",
+            tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+        )
         for state in State.task_states:
-            assert call(f"ti.finish.{ti.dag_id}.{ti.task_id}.{state}", 
count=0) in stats_mock.mock_calls
-        assert call(f"ti.start.{ti.dag_id}.{ti.task_id}") in 
stats_mock.mock_calls
+            assert (
+                call(
+                    f"ti.finish.{ti.dag_id}.{ti.task_id}.{state}",
+                    count=0,
+                    tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+                )
+                in stats_mock.mock_calls
+            )
+        assert (
+            call(f"ti.start.{ti.dag_id}.{ti.task_id}", tags={"dag_id": 
ti.dag_id, "task_id": ti.task_id})
+            in stats_mock.mock_calls
+        )
         assert stats_mock.call_count == len(State.task_states) + 4
 
     def test_command_as_list(self, create_task_instance):

Reply via email to