Eronarn commented on a change in pull request #3584: [AIRFLOW-249] Refactor the
SLA mechanism
URL: https://github.com/apache/incubator-airflow/pull/3584#discussion_r224153565
##########
File path: airflow/models.py
##########
@@ -4328,6 +4442,168 @@ def _test_cycle_helper(self, visit_map, task_id):
visit_map[task_id] = DagBag.CYCLE_DONE
+ @provide_session
+ def manage_slas(self, session=None):
+ """
+ Helper function to encapsulate the sequence of SLA operations.
+ """
+ # Create SlaMiss objects for the various types of SLA misses.
+ self.record_sla_misses(session=session)
+
+ # Collect pending SLA miss callbacks, either created immediately above
+ # or previously failed.
+ unsent_sla_misses = self.get_sla_notifications(session=session)
+ self.log.debug("Found %s unsent SLA miss notifications",
+ len(unsent_sla_misses))\
+
+ # Trigger the SLA miss callbacks.
+ if unsent_sla_misses:
+ self.send_sla_notifications(unsent_sla_misses, session=session)
+
+ @provide_session
+ def record_sla_misses(self, session=None):
+ """
+ Create SLAMiss records for task instances associated with tasks in this
+ DAG. This involves walking forward to address potentially unscheduled
+ but expected executions, since new DAG runs may not get created if
+ there are concurrency restrictions on the scheduler. We still want to
+ receive SLA notifications in that scenario!
+ """
+ self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+ # Get all current DagRuns.
+ scheduled_dagruns = DagRun.find(
+ dag_id=self.dag_id,
+ # TODO: Implement SLA misses for backfills and externally triggered
Review comment:
Added a comment that it is related to AIRFLOW-2236
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services