vandonr-amz commented on code in PR #30496:
URL: https://github.com/apache/airflow/pull/30496#discussion_r1162102946
##########
airflow/dag_processing/processor.py:
##########
@@ -534,7 +535,9 @@ def manage_slas(cls, dag_folder, dag_id: str, session:
Session = NEW_SESSION) ->
email_sent = True
notification_sent = True
except Exception:
- Stats.incr("sla_email_notification_failure",
tags={"dag_id": dag.dag_id})
+ Stats.incr(
+ "sla_email_notification_failure", tags={"dag_id":
dag.dag_id, "email": str(emails)}
Review Comment:
I think this `email` tag is going to be unusable, in some cases it's going
to contain a long list of emails, and I'm not sure you can query tags by parts
of their name ?
Also, a list of emails belongs more in the logs than in metrics imho. What
use case do you see for filtering/expanding metrics based on list of emails
notified ?
##########
airflow/stats.py:
##########
@@ -35,6 +35,7 @@
from statsd import StatsClient
log = logging.getLogger(__name__)
+DeltaType = Union[int, float, datetime.timedelta]
Review Comment:
nice 👍
##########
airflow/executors/base_executor.py:
##########
@@ -216,9 +216,10 @@ def heartbeat(self) -> None:
self.log.debug("%s in queue", num_queued_tasks)
self.log.debug("%s open slots", open_slots)
- Stats.gauge("executor.open_slots", open_slots)
- Stats.gauge("executor.queued_tasks", num_queued_tasks)
- Stats.gauge("executor.running_tasks", num_running_tasks)
+ name = {"name": self.__class__.__name__}
+ Stats.gauge("executor.open_slots", value=open_slots, tags={"status":
"open", **name})
+ Stats.gauge("executor.queued_tasks", value=num_queued_tasks,
tags={"status": "queued", **name})
+ Stats.gauge("executor.running_tasks", value=num_running_tasks,
tags={"status": "running", **name})
Review Comment:
don't you think `tags={"status": "xxx", "name": self.__class__.__name__}`
would be more readable ?
You can also extract `class_name = self.__class__.__name__` to avoid
repeating it, but passing it like this looks weird to me.
##########
airflow/models/taskinstance.py:
##########
@@ -540,6 +540,10 @@ def __init__(
# can be changed when calling 'run'
self.test_mode = False
+ @property
+ def stats_tags(self) -> dict[str, str]:
+ return prune_dict({"dag_id": self.dag_id, "run_id": str(self.run_id),
"task_id": self.task_id})
Review Comment:
isn't run_ID a meaningless autogenerated number ? If so, I think it'd add
little value to the metric, while adding a lot of cardinality, making the costs
📈 for users
##########
airflow/dag_processing/processor.py:
##########
@@ -174,6 +174,7 @@ def _handle_dag_file_processing():
), Stats.timer() as timer:
_handle_dag_file_processing()
log.info("Processing %s took %.3f seconds", file_path,
timer.duration)
+ Stats.timing("dag.processor.parse.time", dt=timer.duration,
tags={"file_path": file_path})
Review Comment:
we already have something similar in
https://github.com/aws-mwaa/upstream-to-airflow/blob/7a8b3c40448455b5e4292937ec85606e2d99d54c/airflow/dag_processing/manager.py#L1044-L1045
I think this is going to send somewhat the same information, won't it ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]