houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417546005
##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
"""
self.bulk_sync_to_db([self], sync_time, session)
+ @provide_session
+ def manage_slas(self, session=None):
+ """
+ Helper function to encapsulate the sequence of SLA operations.
+ """
+ # Create SlaMiss objects for the various types of SLA misses.
+ self.record_sla_misses(session=session)
+
+ # Collect pending SLA miss callbacks, either created immediately above
+ # or previously failed.
+ unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+ self.log.debug("Found %s unsent SLA miss notifications",
+ len(unsent_sla_misses))
+
+ # Trigger the SLA miss callbacks.
+ if unsent_sla_misses:
+ self.send_sla_notifications(unsent_sla_misses, session=session)
+
+ @provide_session
+ def record_sla_misses(self, session=None):
+ """
+ Create SLAMiss records for task instances associated with tasks in this
+ DAG. This involves walking forward to address potentially unscheduled
+ but expected executions, since new DAG runs may not get created if
+ there are concurrency restrictions on the scheduler. We still want to
+ receive SLA notifications in that scenario!
+ In the future, it would be preferable to have an SLA monitoring service
+ that runs independently from the scheduler, so that the service
+ responsible for scheduling work is not also responsible for determining
+ whether work is being scheduled.
+ """
+ self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+ # Get all current DagRuns.
+ scheduled_dagruns = DagRun.find(
+ dag_id=self.dag_id,
+ # TODO related to AIRFLOW-2236: determine how SLA misses should
+ # work for backfills and externally triggered
+ # DAG runs. At minimum they could have duration SLA misses.
+ external_trigger=False,
+ no_backfills=True,
+ # We aren't passing in the "state" parameter because we care about
+ # checking for SLAs whether the DAG run has failed, succeeded, or
+ # is still running.
+ session=session
+ )
+
+ # TODO: Is there a better limit here than "look at most recent 100"?
+ # Perhaps there should be a configurable lookback window on the DAG,
+ # for how many runs to consider SLA violations for.
+ scheduled_dagruns = scheduled_dagruns[-100:]
Review comment:
yeah, agreed. perhaps for every dag run created, we can create a to
check entry in a separate dag sla table. every checked dag run can be removed
from that table to keep the size growth under control.
if we are not going to implementation filter optimization within this PR, we
should at least move the grab last 100 entries logic from Python into db query.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]