This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7e1dace0fd [OTel Integration] Add tagging to existing stats (#30496)
7e1dace0fd is described below
commit 7e1dace0fd3eed2bbec2ae1e55ac586c9e3dd8cb
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Apr 17 17:58:46 2023 -0700
[OTel Integration] Add tagging to existing stats (#30496)
---
airflow/dag_processing/processor.py | 6 ++---
airflow/executors/base_executor.py | 16 +++++++++---
airflow/jobs/scheduler_job_runner.py | 6 ++---
airflow/models/dag.py | 4 +--
airflow/models/dagrun.py | 32 ++++++++++++-----------
airflow/models/taskinstance.py | 39 +++++++++++++++--------------
airflow/stats.py | 13 +++++-----
tests/dag_processing/test_processor.py | 27 +++++++++-----------
tests/executors/test_base_executor.py | 8 +++---
tests/executors/test_celery_executor.py | 12 ++++++---
tests/executors/test_kubernetes_executor.py | 16 +++++++++---
tests/executors/test_local_executor.py | 12 ++++++---
tests/executors/test_sequential_executor.py | 16 +++++++++---
tests/jobs/test_scheduler_job.py | 22 ++++++++--------
tests/models/test_dag.py | 2 +-
tests/models/test_dagrun.py | 13 +++++-----
tests/models/test_taskinstance.py | 26 +++++++++++++------
17 files changed, 159 insertions(+), 111 deletions(-)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 442431afb5..c6f07af0b6 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -459,9 +459,7 @@ class DagFileProcessor(LoggingMixin):
timestamp=ts,
)
sla_misses.append(sla_miss)
- Stats.incr(
- "sla_missed", tags={"dag_id": ti.dag_id, "run_id":
ti.run_id, "task_id": ti.task_id}
- )
+ Stats.incr("sla_missed", tags={"dag_id": ti.dag_id,
"task_id": ti.task_id})
if sla_misses:
session.add_all(sla_misses)
session.commit()
@@ -766,7 +764,7 @@ class DagFileProcessor(LoggingMixin):
return DagBag(file_path, include_examples=False)
except Exception:
cls.logger().exception("Failed at reloading the DAG file %s",
file_path)
- Stats.incr("dag_file_refresh_error", 1, 1)
+ Stats.incr("dag_file_refresh_error", tags={"file_path": file_path})
raise
@provide_session
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index e49938c458..4f9ac0c4f6 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -217,9 +217,19 @@ class BaseExecutor(LoggingMixin):
self.log.debug("%s in queue", num_queued_tasks)
self.log.debug("%s open slots", open_slots)
- Stats.gauge("executor.open_slots", open_slots)
- Stats.gauge("executor.queued_tasks", num_queued_tasks)
- Stats.gauge("executor.running_tasks", num_running_tasks)
+ Stats.gauge(
+ "executor.open_slots", value=open_slots, tags={"status": "open",
"name": self.__class__.__name__}
+ )
+ Stats.gauge(
+ "executor.queued_tasks",
+ value=num_queued_tasks,
+ tags={"status": "queued", "name": self.__class__.__name__},
+ )
+ Stats.gauge(
+ "executor.running_tasks",
+ value=num_running_tasks,
+ tags={"status": "running", "name": self.__class__.__name__},
+ )
self.trigger_tasks(open_slots)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index bfaefa2822..0f750e30c1 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -784,7 +784,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if ti_queued and not ti_requeued:
Stats.incr(
"scheduler.tasks.killed_externally",
- tags={"dag_id": ti.dag_id, "run_id": ti.run_id, "task_id":
ti.task_id},
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
)
msg = (
"Executor reports task instance %s finished (%s) although
the "
@@ -1717,9 +1717,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
self.log.error("Detected zombie job: %s", request)
self.job.executor.send_callback(request)
- Stats.incr(
- "zombies_killed", tags={"dag_id": ti.dag_id, "run_id":
ti.run_id, "task_id": ti.task_id}
- )
+ Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id":
ti.task_id})
@staticmethod
def _generate_zombie_message_details(ti: TI):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 9888e5cbd8..89becc04c7 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1341,9 +1341,7 @@ class DAG(LoggingMixin):
callback(context)
except Exception:
self.log.exception("failed to invoke dag state update
callback")
- Stats.incr(
- "dag.callback_exceptions", tags={"dag_id":
dagrun.dag_id, "run_id": dagrun.run_id}
- )
+ Stats.incr("dag.callback_exceptions", tags={"dag_id":
dagrun.dag_id})
def get_active_runs(self):
"""
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index edb0ec78ac..427cf41c88 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -61,7 +61,7 @@ from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
from airflow.typing_compat import Literal
from airflow.utils import timezone
-from airflow.utils.helpers import is_container
+from airflow.utils.helpers import is_container, prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked,
tuple_in_condition, with_row_locks
@@ -233,6 +233,10 @@ class DagRun(Base, LoggingMixin):
external_trigger=self.external_trigger,
)
+ @property
+ def stats_tags(self) -> dict[str, str]:
+ return prune_dict({"dag_id": self.dag_id, "run_type": self.run_type})
+
@property
def logical_date(self) -> datetime:
return self.execution_date
@@ -562,7 +566,10 @@ class DagRun(Base, LoggingMixin):
start_dttm = timezone.utcnow()
self.last_scheduling_decision = start_dttm
- with Stats.timer(f"dagrun.dependency-check.{self.dag_id}"):
+ with Stats.timer(
+ f"dagrun.dependency-check.{self.dag_id}",
+ tags=self.stats_tags,
+ ):
dag = self.get_dag()
info = self.task_instance_scheduling_decisions(session)
@@ -894,12 +901,10 @@ class DagRun(Base, LoggingMixin):
data_interval_end = dag.get_run_data_interval(self).end
true_delay = first_start_date - data_interval_end
if true_delay.total_seconds() > 0:
-
Stats.timing(f"dagrun.{dag.dag_id}.first_task_scheduling_delay", true_delay)
Stats.timing(
- "dagrun.first_task_scheduling_delay",
- true_delay,
- tags={"dag_id": dag.dag_id},
+ f"dagrun.{dag.dag_id}.first_task_scheduling_delay",
true_delay, tags=self.stats_tags
)
+ Stats.timing("dagrun.first_task_scheduling_delay",
true_delay, tags=self.stats_tags)
except Exception:
self.log.warning("Failed to record first_task_scheduling_delay
metric:", exc_info=True)
@@ -914,12 +919,9 @@ class DagRun(Base, LoggingMixin):
return
duration = self.end_date - self.start_date
- if self.state == State.SUCCESS:
- Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
- Stats.timing("dagrun.duration.success", duration, tags={"dag_id":
self.dag_id})
- elif self.state == State.FAILED:
- Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)
- Stats.timing("dagrun.duration.failed", duration, tags={"dag_id":
self.dag_id})
+ timer_params = {"dt": duration, "tags": self.stats_tags}
+ Stats.timing(f"dagrun.duration.{self.state.value}.{self.dag_id}",
**timer_params)
+ Stats.timing(f"dagrun.duration.{self.state.value}", **timer_params)
@provide_session
def verify_integrity(self, *, session: Session = NEW_SESSION) -> None:
@@ -982,14 +984,14 @@ class DagRun(Base, LoggingMixin):
should_restore_task = (task is not None) and ti.state ==
State.REMOVED
if should_restore_task:
self.log.info("Restoring task '%s' which was previously
removed from DAG '%s'", ti, dag)
- Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+ Stats.incr(f"task_restored_to_dag.{dag.dag_id}",
tags=self.stats_tags)
ti.state = State.NONE
except AirflowException:
if ti.state == State.REMOVED:
pass # ti has already been removed, just ignore it
elif self.state != State.RUNNING and not dag.partial:
self.log.warning("Failed to get task '%s' for dag '%s'.
Marking it as removed.", ti, dag)
- Stats.incr(f"task_removed_from_dag.{dag.dag_id}", 1, 1)
+ Stats.incr(f"task_removed_from_dag.{dag.dag_id}",
tags=self.stats_tags)
ti.state = State.REMOVED
continue
@@ -1145,7 +1147,7 @@ class DagRun(Base, LoggingMixin):
session.bulk_save_objects(tasks)
for task_type, count in created_counts.items():
- Stats.incr(f"task_instance_created-{task_type}", count)
+ Stats.incr(f"task_instance_created-{task_type}", count,
tags=self.stats_tags)
session.flush()
except IntegrityError:
self.log.info(
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d4e37741c3..4039f72bb7 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -106,7 +106,7 @@ from airflow.typing_compat import Literal, TypeGuard
from airflow.utils import timezone
from airflow.utils.context import ConnectionAccessor, Context,
VariableAccessor, context_merge
from airflow.utils.email import send_email
-from airflow.utils.helpers import render_template_to_string
+from airflow.utils.helpers import prune_dict, render_template_to_string
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.module_loading import qualname
from airflow.utils.net import get_hostname
@@ -543,6 +543,10 @@ class TaskInstance(Base, LoggingMixin):
# can be changed when calling 'run'
self.test_mode = False
+ @property
+ def stats_tags(self) -> dict[str, str]:
+ return prune_dict({"dag_id": self.dag_id, "task_id": self.task_id})
+
@staticmethod
def insert_mapping(run_id: str, task: Operator, map_index: int) ->
dict[str, Any]:
""":meta private:"""
@@ -1262,12 +1266,7 @@ class TaskInstance(Base, LoggingMixin):
self.pid = None
if not ignore_all_deps and not ignore_ti_state and self.state ==
State.SUCCESS:
- Stats.incr(
- "previously_succeeded",
- 1,
- 1,
- tags={"dag_id": self.dag_id, "run_id": self.run_id, "task_id":
self.task_id},
- )
+ Stats.incr("previously_succeeded", tags=self.stats_tags)
if not mark_success:
# Firstly find non-runnable and non-requeueable tis.
@@ -1412,10 +1411,14 @@ class TaskInstance(Base, LoggingMixin):
session.merge(self)
session.commit()
actual_start_date = timezone.utcnow()
- Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}")
+ Stats.incr(f"ti.start.{self.task.dag_id}.{self.task.task_id}",
tags=self.stats_tags)
# Initialize final state counters at zero
for state in State.task_states:
-
Stats.incr(f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}", count=0)
+ Stats.incr(
+ f"ti.finish.{self.task.dag_id}.{self.task.task_id}.{state}",
+ count=0,
+ tags=self.stats_tags,
+ )
self.task = self.task.prepare_for_execution()
context = self.get_template_context(ignore_param_exceptions=False)
@@ -1485,7 +1488,7 @@ class TaskInstance(Base, LoggingMixin):
session.commit()
raise
finally:
- Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}")
+ Stats.incr(f"ti.finish.{self.dag_id}.{self.task_id}.{self.state}",
tags=self.stats_tags)
# Recording SKIPPED or SUCCESS
self.clear_next_method_args()
@@ -1547,7 +1550,7 @@ class TaskInstance(Base, LoggingMixin):
if not self.next_method:
self.clear_xcom_data()
- with
Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration"):
+ with
Stats.timer(f"dag.{self.task.dag_id}.{self.task.task_id}.duration",
tags=self.stats_tags):
# Set the validated/merged params on the task object.
self.task.params = context["params"]
@@ -1582,10 +1585,8 @@ class TaskInstance(Base, LoggingMixin):
# Run post_execute callback
self.task.post_execute(context=context, result=result)
- Stats.incr(f"operator_successes_{self.task.task_type}", 1, 1)
- Stats.incr(
- "ti_successes", tags={"dag_id": self.dag_id, "run_id":
self.run_id, "task_id": self.task_id}
- )
+ Stats.incr(f"operator_successes_{self.task.task_type}",
tags=self.stats_tags)
+ Stats.incr("ti_successes", tags=self.stats_tags)
def _run_finished_callback(
self,
@@ -1852,10 +1853,10 @@ class TaskInstance(Base, LoggingMixin):
self.end_date = timezone.utcnow()
self.set_duration()
- Stats.incr(f"operator_failures_{self.operator}")
- Stats.incr(
- "ti_failures", tags={"dag_id": self.dag_id, "run_id": self.run_id,
"task_id": self.task_id}
- )
+
+ Stats.incr(f"operator_failures_{self.operator}", tags=self.stats_tags)
+ Stats.incr("ti_failures", tags=self.stats_tags)
+
if not test_mode:
session.add(Log(State.FAILED, self))
diff --git a/airflow/stats.py b/airflow/stats.py
index 8785569682..da31e7d913 100644
--- a/airflow/stats.py
+++ b/airflow/stats.py
@@ -24,7 +24,7 @@ import socket
import string
import time
from functools import partial, wraps
-from typing import TYPE_CHECKING, Callable, Iterable, TypeVar, cast
+from typing import TYPE_CHECKING, Callable, Iterable, TypeVar, Union, cast
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException,
InvalidStatsNameException
@@ -35,6 +35,7 @@ if TYPE_CHECKING:
from statsd import StatsClient
log = logging.getLogger(__name__)
+DeltaType = Union[int, float, datetime.timedelta]
class TimerProtocol(Protocol):
@@ -96,7 +97,7 @@ class StatsLogger(Protocol):
def timing(
cls,
stat: str,
- dt: int | float | datetime.timedelta,
+ dt: DeltaType | None,
*,
tags: dict[str, str] | None = None,
) -> None:
@@ -333,9 +334,7 @@ class NoStatsLogger:
"""Gauge stat."""
@classmethod
- def timing(
- cls, stat: str, dt: int | float | datetime.timedelta, *, tags:
dict[str, str] | None = None
- ) -> None:
+ def timing(cls, stat: str, dt: DeltaType, *, tags: dict[str, str] | None =
None) -> None:
"""Stats timing."""
@classmethod
@@ -410,7 +409,7 @@ class SafeStatsdLogger:
def timing(
self,
stat: str,
- dt: int | float | datetime.timedelta,
+ dt: DeltaType,
*,
tags: dict[str, str] | None = None,
) -> None:
@@ -514,7 +513,7 @@ class SafeDogStatsdLogger:
def timing(
self,
stat: str,
- dt: int | float | datetime.timedelta,
+ dt: DeltaType,
*,
tags: dict[str, str] | None = None,
) -> None:
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index a574dc88a5..11e681b209 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -248,9 +248,7 @@ class TestDagFileProcessor:
.count()
)
assert sla_miss_count == 1
- mock_stats_incr.assert_called_with(
- "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test",
"task_id": "dummy"}
- )
+ mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id":
"test_sla_miss", "task_id": "dummy"})
# Now call manage_slas and see that it runs without errors
# because of existing SlaMiss above.
# Since this is run often, it's possible that it runs before another
@@ -300,9 +298,7 @@ class TestDagFileProcessor:
.count()
)
assert sla_miss_count == 2
- mock_stats_incr.assert_called_with(
- "sla_missed", tags={"dag_id": "test_sla_miss", "run_id": "test",
"task_id": "dummy"}
- )
+ mock_stats_incr.assert_called_with("sla_missed", tags={"dag_id":
"test_sla_miss", "task_id": "dummy"})
@patch.object(DagFileProcessor, "logger")
@mock.patch("airflow.dag_processing.processor.Stats.incr")
@@ -404,15 +400,18 @@ class TestDagFileProcessor:
sending an email
"""
session = settings.Session()
+ dag_id = "test_sla_miss"
+ task_id = "test_ti"
+ email = "[email protected]"
# Mock the callback function so we can verify that it was not called
mock_send_email.side_effect = RuntimeError("Could not send an email")
test_start_date = timezone.utcnow() - datetime.timedelta(days=1)
dag, task = create_dummy_dag(
- dag_id="test_sla_miss",
- task_id="dummy",
- email="[email protected]",
+ dag_id=dag_id,
+ task_id=task_id,
+ email=email,
default_args={"start_date": test_start_date, "sla":
datetime.timedelta(hours=1)},
)
mock_stats_incr.reset_mock()
@@ -420,7 +419,7 @@ class TestDagFileProcessor:
session.merge(TaskInstance(task=task, execution_date=test_start_date,
state="Success"))
# Create an SlaMiss where notification was sent, but email was not
- session.merge(SlaMiss(task_id="dummy", dag_id="test_sla_miss",
execution_date=test_start_date))
+ session.merge(SlaMiss(task_id=task_id, dag_id=dag_id,
execution_date=test_start_date))
mock_log = mock.Mock()
mock_get_log.return_value = mock_log
@@ -428,13 +427,11 @@ class TestDagFileProcessor:
mock_dagbag.get_dag.return_value = dag
mock_get_dagbag.return_value = mock_dagbag
- DagFileProcessor.manage_slas(dag_folder=dag.fileloc,
dag_id="test_sla_miss", session=session)
+ DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id=dag_id,
session=session)
mock_log.exception.assert_called_once_with(
- "Could not send SLA Miss email notification for DAG %s",
"test_sla_miss"
- )
- mock_stats_incr.assert_called_once_with(
- "sla_email_notification_failure", tags={"dag_id": "test_sla_miss"}
+ "Could not send SLA Miss email notification for DAG %s", dag_id
)
+
mock_stats_incr.assert_called_once_with("sla_email_notification_failure",
tags={"dag_id": dag_id})
@mock.patch("airflow.dag_processing.processor.DagFileProcessor._get_dagbag")
def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag,
create_dummy_dag):
diff --git a/tests/executors/test_base_executor.py
b/tests/executors/test_base_executor.py
index 351fb614f4..b2059ae996 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -87,9 +87,11 @@ def test_gauge_executor_metrics(mock_stats_gauge,
mock_trigger_tasks, mock_sync)
executor = BaseExecutor()
executor.heartbeat()
calls = [
- mock.call("executor.open_slots", mock.ANY),
- mock.call("executor.queued_tasks", mock.ANY),
- mock.call("executor.running_tasks", mock.ANY),
+ mock.call("executor.open_slots", value=mock.ANY, tags={"status":
"open", "name": "BaseExecutor"}),
+ mock.call("executor.queued_tasks", value=mock.ANY, tags={"status":
"queued", "name": "BaseExecutor"}),
+ mock.call(
+ "executor.running_tasks", value=mock.ANY, tags={"status":
"running", "name": "BaseExecutor"}
+ ),
]
mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/executors/test_celery_executor.py
b/tests/executors/test_celery_executor.py
index b24023264e..b2ea104be7 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -124,9 +124,15 @@ class TestCeleryExecutor:
executor = celery_executor.CeleryExecutor()
executor.heartbeat()
calls = [
- mock.call("executor.open_slots", mock.ANY),
- mock.call("executor.queued_tasks", mock.ANY),
- mock.call("executor.running_tasks", mock.ANY),
+ mock.call(
+ "executor.open_slots", value=mock.ANY, tags={"status": "open",
"name": "CeleryExecutor"}
+ ),
+ mock.call(
+ "executor.queued_tasks", value=mock.ANY, tags={"status":
"queued", "name": "CeleryExecutor"}
+ ),
+ mock.call(
+ "executor.running_tasks", value=mock.ANY, tags={"status":
"running", "name": "CeleryExecutor"}
+ ),
]
mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/executors/test_kubernetes_executor.py
b/tests/executors/test_kubernetes_executor.py
index ea261b287c..c2b7d24c55 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -396,9 +396,19 @@ class TestKubernetesExecutor:
executor = self.kubernetes_executor
executor.heartbeat()
calls = [
- mock.call("executor.open_slots", mock.ANY),
- mock.call("executor.queued_tasks", mock.ANY),
- mock.call("executor.running_tasks", mock.ANY),
+ mock.call(
+ "executor.open_slots", value=mock.ANY, tags={"status": "open",
"name": "KubernetesExecutor"}
+ ),
+ mock.call(
+ "executor.queued_tasks",
+ value=mock.ANY,
+ tags={"status": "queued", "name": "KubernetesExecutor"},
+ ),
+ mock.call(
+ "executor.running_tasks",
+ value=mock.ANY,
+ tags={"status": "running", "name": "KubernetesExecutor"},
+ ),
]
mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/executors/test_local_executor.py
b/tests/executors/test_local_executor.py
index d292d20154..a69e158612 100644
--- a/tests/executors/test_local_executor.py
+++ b/tests/executors/test_local_executor.py
@@ -132,8 +132,14 @@ class TestLocalExecutor:
executor = LocalExecutor()
executor.heartbeat()
calls = [
- mock.call("executor.open_slots", mock.ANY),
- mock.call("executor.queued_tasks", mock.ANY),
- mock.call("executor.running_tasks", mock.ANY),
+ mock.call(
+ "executor.open_slots", value=mock.ANY, tags={"status": "open",
"name": "LocalExecutor"}
+ ),
+ mock.call(
+ "executor.queued_tasks", value=mock.ANY, tags={"status":
"queued", "name": "LocalExecutor"}
+ ),
+ mock.call(
+ "executor.running_tasks", value=mock.ANY, tags={"status":
"running", "name": "LocalExecutor"}
+ ),
]
mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/executors/test_sequential_executor.py
b/tests/executors/test_sequential_executor.py
index 94bea51857..46cf768a25 100644
--- a/tests/executors/test_sequential_executor.py
+++ b/tests/executors/test_sequential_executor.py
@@ -48,8 +48,18 @@ class TestSequentialExecutor:
executor = SequentialExecutor()
executor.heartbeat()
calls = [
- mock.call("executor.open_slots", mock.ANY),
- mock.call("executor.queued_tasks", mock.ANY),
- mock.call("executor.running_tasks", mock.ANY),
+ mock.call(
+ "executor.open_slots", value=mock.ANY, tags={"status": "open",
"name": "SequentialExecutor"}
+ ),
+ mock.call(
+ "executor.queued_tasks",
+ value=mock.ANY,
+ tags={"status": "queued", "name": "SequentialExecutor"},
+ ),
+ mock.call(
+ "executor.running_tasks",
+ value=mock.ANY,
+ tags={"status": "running", "name": "SequentialExecutor"},
+ ),
]
mock_stats_gauge.assert_has_calls(calls)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3595623260..5be7612790 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -269,12 +269,10 @@ class TestSchedulerJob:
[
mock.call(
"scheduler.tasks.killed_externally",
- tags={"dag_id": dag_id, "run_id": ti1.run_id, "task_id":
ti1.task_id},
- ),
- mock.call("operator_failures_EmptyOperator"),
- mock.call(
- "ti_failures", tags={"dag_id": dag_id, "run_id":
ti1.run_id, "task_id": ti1.task_id}
+ tags={"dag_id": dag_id, "task_id": ti1.task_id},
),
+ mock.call("operator_failures_EmptyOperator", tags={"dag_id":
dag_id, "task_id": ti1.task_id}),
+ mock.call("ti_failures", tags={"dag_id": dag_id, "task_id":
ti1.task_id}),
],
any_order=True,
)
@@ -283,7 +281,8 @@ class TestSchedulerJob:
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_no_callback(self, mock_stats_incr,
mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_with_no_callback"
- task_id_1 = "dummy_task"
+ task_id = "test_task"
+ run_id = "test_run"
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
@@ -295,9 +294,9 @@ class TestSchedulerJob:
session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
- task1 = EmptyOperator(task_id=task_id_1, retries=1)
+ task1 = EmptyOperator(task_id=task_id, retries=1)
ti1 = dag_maker.create_dagrun(
- run_id="dr2", execution_date=DEFAULT_DATE + timedelta(hours=1)
+ run_id=run_id, execution_date=DEFAULT_DATE + timedelta(hours=1)
).get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
@@ -333,10 +332,10 @@ class TestSchedulerJob:
[
mock.call(
"scheduler.tasks.killed_externally",
- tags={"dag_id": dag_id, "run_id": "dr2", "task_id":
task_id_1},
+ tags={"dag_id": dag_id, "task_id": task_id},
),
- mock.call("operator_failures_EmptyOperator"),
- mock.call("ti_failures", tags={"dag_id": dag_id, "run_id":
"dr2", "task_id": task_id_1}),
+ mock.call("operator_failures_EmptyOperator", tags={"dag_id":
dag_id, "task_id": task_id}),
+ mock.call("ti_failures", tags={"dag_id": dag_id, "task_id":
task_id}),
],
any_order=True,
)
@@ -387,7 +386,6 @@ class TestSchedulerJob:
"scheduler.tasks.killed_externally",
tags={
"dag_id": "test_process_executor_events_with_callback",
- "run_id": "test",
"task_id": "dummy_task",
},
)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index b4ff9a0bed..d9f269bca6 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -1482,7 +1482,7 @@ class TestDag:
mock_stats.incr.assert_called_with(
"dag.callback_exceptions",
- tags={"dag_id": "test_dag_callback_crash", "run_id":
"manual__2015-01-02T00:00:00+00:00"},
+ tags={"dag_id": "test_dag_callback_crash"},
)
dag.clear()
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5ffbdaae02..a053fc6ef8 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -854,6 +854,7 @@ class TestDagRun:
"""
dag = DAG(dag_id="test_emit_dag_stats", start_date=DEFAULT_DATE,
schedule=schedule_interval)
dag_task = EmptyOperator(task_id="dummy", dag=dag, owner="airflow")
+ expected_stat_tags = {"dag_id": f"{dag.dag_id}", "run_type":
DagRunType.SCHEDULED}
try:
info = dag.next_dagrun_info(None)
@@ -888,12 +889,9 @@ class TestDagRun:
if expected:
true_delay = ti.start_date - dag_run.data_interval_end
- sched_delay_stat_call = call(
- metric_name,
- true_delay,
- )
+ sched_delay_stat_call = call(metric_name, true_delay,
tags=expected_stat_tags)
sched_delay_stat_call_with_tags = call(
- "dagrun.first_task_scheduling_delay", true_delay,
tags={"dag_id": f"{dag.dag_id}"}
+ "dagrun.first_task_scheduling_delay", true_delay,
tags=expected_stat_tags
)
assert (
sched_delay_stat_call in stats_mock.mock_calls
@@ -940,6 +938,7 @@ class TestDagRun:
@mock.patch.object(Stats, "incr")
def test_verify_integrity_task_start_and_end_date(Stats_incr, session,
run_type, expected_tis):
"""Test that tasks with specific dates are only created for backfill
runs"""
+
with DAG("test", start_date=DEFAULT_DATE) as dag:
EmptyOperator(task_id="without")
EmptyOperator(task_id="with_start_date", start_date=DEFAULT_DATE +
datetime.timedelta(1))
@@ -960,7 +959,9 @@ def
test_verify_integrity_task_start_and_end_date(Stats_incr, session, run_type,
tis = dag_run.task_instances
assert len(tis) == expected_tis
- Stats_incr.assert_called_with("task_instance_created-EmptyOperator",
expected_tis)
+ Stats_incr.assert_called_with(
+ "task_instance_created-EmptyOperator", expected_tis, tags={"dag_id":
"test", "run_type": run_type}
+ )
@pytest.mark.parametrize("is_noop", [True, False])
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index fb91982c80..098a0acd04 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2529,6 +2529,7 @@ class TestTaskInstance:
ti.task = None
ti.state = State.QUEUED
session.flush()
+ expected_stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
assert ti.task is None, "Check critical pre-condition"
@@ -2542,10 +2543,8 @@ class TestTaskInstance:
# Check 'ti.try_number' is bumped to 2. This is try_number for next run
assert ti.try_number == 2
- Stats_incr.assert_any_call(
- "ti_failures", tags={"dag_id": ti.dag_id, "run_id": ti.run_id,
"task_id": ti.task_id}
- )
- Stats_incr.assert_any_call("operator_failures_EmptyOperator")
+ Stats_incr.assert_any_call("ti_failures", tags=expected_stats_tags)
+ Stats_incr.assert_any_call("operator_failures_EmptyOperator",
tags=expected_stats_tags)
def test_handle_failure_task_undefined(self, create_task_instance):
"""
@@ -2654,10 +2653,23 @@ class TestTaskInstance:
session.commit()
ti._run_raw_task()
ti.refresh_from_db()
-
stats_mock.assert_called_with(f"ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}")
+ stats_mock.assert_called_with(
+ f"ti.finish.{ti.dag_id}.{ti.task_id}.{ti.state}",
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ )
for state in State.task_states:
- assert call(f"ti.finish.{ti.dag_id}.{ti.task_id}.{state}",
count=0) in stats_mock.mock_calls
- assert call(f"ti.start.{ti.dag_id}.{ti.task_id}") in
stats_mock.mock_calls
+ assert (
+ call(
+ f"ti.finish.{ti.dag_id}.{ti.task_id}.{state}",
+ count=0,
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ )
+ in stats_mock.mock_calls
+ )
+ assert (
+ call(f"ti.start.{ti.dag_id}.{ti.task_id}", tags={"dag_id":
ti.dag_id, "task_id": ti.task_id})
+ in stats_mock.mock_calls
+ )
assert stats_mock.call_count == len(State.task_states) + 4
def test_command_as_list(self, create_task_instance):