[
https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095813#comment-17095813
]
ASF GitHub Bot commented on AIRFLOW-249:
----------------------------------------
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417536580
##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
% (self.task_id, dag.dag_id))
self.sla = sla
self.execution_timeout = execution_timeout
+
+ # Warn about use of the deprecated SLA parameter
+ if sla and expected_finish:
+ warnings.warn(
+ "Both sla and expected_finish provided as task "
+ "parameters to {}; using expected_finish and ignoring "
+ "deprecated sla parameter.".format(self),
+ category=PendingDeprecationWarning
+ )
+ elif sla:
+ warnings.warn(
+ "sla is deprecated as a task parameter for {}; converting to "
+ "expected_finish instead.".format(self),
+ category=PendingDeprecationWarning
+ )
+ expected_finish = sla
+
+ # Set SLA parameters, batching invalid type messages into a
+ # single exception.
+ sla_param_errs: List = []
+ if expected_duration and not isinstance(expected_duration, timedelta):
+ sla_param_errs.append("expected_duration must be a timedelta, "
+ "got: {}".format(expected_duration))
+ if expected_start and not isinstance(expected_start, timedelta):
+ sla_param_errs.append("expected_start must be a timedelta, "
+ "got: {}".format(expected_start))
+ if expected_finish and not isinstance(expected_finish, timedelta):
+ sla_param_errs.append("expected_finish must be a timedelta, "
+ "got: {}".format(expected_finish))
+ if sla_param_errs:
+ raise AirflowException("Invalid SLA params were set! {}".format(
+ "; ".join(sla_param_errs)))
+
+ # If no exception has been raised, go ahead and set these.
+ self.expected_duration = expected_duration
+ self.expected_start = expected_start
+ self.expected_finish = expected_finish
+
+ # Warn the user if they've set any non-sensical parameter combinations
+ if self.expected_start and self.expected_finish \
+ and self.expected_start >= self.expected_finish:
+ self.log.warning(
+ "Task %s has an expected_start (%s) that occurs after its "
+ "expected_finish (%s), so it will always send an SLA "
+ "notification.",
+ self, self.expected_start, self.expected_finish
+ )
+
+ if self.expected_duration and self.expected_start \
Review comment:
looks like this if block has one more indentation then it should?
##########
File path: airflow/models/taskinstance.py
##########
@@ -412,6 +412,17 @@ def mark_success_url(self):
"&downstream=false"
).format(task_id=self.task_id, dag_id=self.dag_id, iso=iso)
+ @property
+ def details_url(self):
+ iso = quote(self.execution_date.isoformat())
+ base_url = conf.get('webserver', 'BASE_URL')
+ return base_url + (
+ "/task"
+ "?task_id={task_id}"
+ "&dag_id={dag_id}"
+ "&execution_date={iso}"
+ ).format(task_id=self.task_id, dag_id=self.dag_id, iso=iso)
Review comment:
minor nitpick, we can avoid creating an extra string at runtime with the
following code:
```python
return (
"{base_url}/task"
"?task_id={task_id}"
"&dag_id={dag_id}"
"&execution_date={iso}"
).format(base_url=base_url, task_id=self.task_id,
dag_id=self.dag_id, iso=iso)
```
##########
File path: airflow/models/baseoperator.py
##########
@@ -1187,6 +1306,12 @@ def get_serialized_fields(cls):
return cls.__serialized_fields
+ def has_slas(self):
Review comment:
nitpick, could you annotate return type for this method as well?
##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
"""
self.bulk_sync_to_db([self], sync_time, session)
+ @provide_session
+ def manage_slas(self, session=None):
+ """
+ Helper function to encapsulate the sequence of SLA operations.
+ """
+ # Create SlaMiss objects for the various types of SLA misses.
+ self.record_sla_misses(session=session)
+
+ # Collect pending SLA miss callbacks, either created immediately above
+ # or previously failed.
+ unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+ self.log.debug("Found %s unsent SLA miss notifications",
+ len(unsent_sla_misses))
+
+ # Trigger the SLA miss callbacks.
+ if unsent_sla_misses:
+ self.send_sla_notifications(unsent_sla_misses, session=session)
+
+ @provide_session
+ def record_sla_misses(self, session=None):
+ """
+ Create SLAMiss records for task instances associated with tasks in this
+ DAG. This involves walking forward to address potentially unscheduled
+ but expected executions, since new DAG runs may not get created if
+ there are concurrency restrictions on the scheduler. We still want to
+ receive SLA notifications in that scenario!
+ In the future, it would be preferable to have an SLA monitoring service
+ that runs independently from the scheduler, so that the service
+ responsible for scheduling work is not also responsible for determining
+ whether work is being scheduled.
+ """
+ self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+ # Get all current DagRuns.
+ scheduled_dagruns = DagRun.find(
+ dag_id=self.dag_id,
+ # TODO related to AIRFLOW-2236: determine how SLA misses should
+ # work for backfills and externally triggered
+ # DAG runs. At minimum they could have duration SLA misses.
+ external_trigger=False,
+ no_backfills=True,
+ # We aren't passing in the "state" parameter because we care about
+ # checking for SLAs whether the DAG run has failed, succeeded, or
+ # is still running.
+ session=session
+ )
+
+ # TODO: Is there a better limit here than "look at most recent 100"?
+ # Perhaps there should be a configurable lookback window on the DAG,
+ # for how many runs to consider SLA violations for.
+ scheduled_dagruns = scheduled_dagruns[-100:]
+ scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+ TI = TaskInstance
+ DR = DagRun
+
+ if scheduled_dagrun_ids:
+ # Find full, existing TIs for these DagRuns.
+ scheduled_tis = (
+ session.query(TI)
+ .outerjoin(DR, and_(
+ DR.dag_id == TI.dag_id,
+ DR.execution_date == TI.execution_date))
+ # Only look at TIs for this DAG.
+ .filter(TI.dag_id == self.dag_id)
+ # Only look at TIs that *still* exist in this DAG.
+ .filter(TI.task_id.in_(self.task_ids))
+ # Don't look for success/skip TIs. We check SLAs often, so
+ # there's little chance that a TI switches to successful
+ # after an SLA miss but before we notice; and this should
+ # be a major perf boost (since most TIs are successful or
+ # skipped).
+ .filter(or_(
+ # has to be written this way to account for sql nulls
+ TI.state == None, # noqa E711
+ not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+ ))
+ # Only look at specified DagRuns
+ .filter(DR.id.in_(scheduled_dagrun_ids))
+ # If the DAGRun is SUCCEEDED, then everything has gone
+ # according to plan. But if it's FAILED, someone may be
+ # coming to fix it, and SLAs for tasks in it will still
+ # matter.
+ .filter(DR.state != State.SUCCESS)
+ .order_by(asc(DR.execution_date))
+ .all()
+ )
+ else:
+ scheduled_tis = []
+
+ self.log.debug(
+ "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+ len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
Review comment:
for logging, please don't use format, it will create the string at
runtime regardless of logging level. always use `%s` with parameters passed in
as arguments to logging call.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Refactor the SLA mechanism
> --------------------------
>
> Key: AIRFLOW-249
> URL: https://issues.apache.org/jira/browse/AIRFLOW-249
> Project: Apache Airflow
> Issue Type: Improvement
> Reporter: dud
> Priority: Major
>
> Hello
> I've noticed the SLA feature is currently behaving as follow :
> - it doesn't work on DAG scheduled @once or None because they have no
> dag.followwing_schedule property
> - it keeps endlessly checking for SLA misses without ever worrying about any
> end_date. Worse I noticed that emails are still being sent for runs that are
> never happening because of end_date
> - it keeps checking for recent TIs even if SLA notification has been already
> been sent for them
> - the SLA logic is only being fired after following_schedule + sla has
> elapsed, in other words one has to wait for the next TI before having a
> chance of getting any email. Also the email reports dag.following_schedule
> time (I guess because it is close of TI.start_date), but unfortunately that
> doesn't match what the task instances shows nor the log filename
> - the SLA logic is based on max(TI.execution_date) for the starting point of
> its checks, that means that for a DAG whose SLA is longer than its schedule
> period if half of the TIs are running longer than expected it will go
> unnoticed. This could be demonstrated with a DAG like this one :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 6, 16, 12, 20),
> 'email': my_email
> 'sla': timedelta(minutes=2),
> }
> dag = DAG('unnoticed_sla', default_args=default_args,
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
> minute = kwargs['execution_date'].strftime("%M")
> is_odd = int(minute) % 2
> if is_odd:
> sleep(300)
> else:
> sleep(10)
> return True
> PythonOperator(
> task_id='sla_miss',
> python_callable=alternating_sleep,
> provide_context=True,
> dag=dag)
> {code}
> I've tried to rework the SLA triggering mechanism by addressing the above
> points., please [have a look on
> it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
> I made some tests with this patch :
> - the fluctuent DAG shown above no longer make Airflow skip any SLA event :
> {code}
> task_id | dag_id | execution_date | email_sent |
> timestamp | description | notification_sent
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
> sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16
> 15:08:26.058631 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16
> 15:10:06.093253 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16
> 15:12:06.241773 | | t
> {code}
> - on a normal DAG, the SLA is being triggred more quickly :
> {code}
> // start_date = 2016-06-16 15:55:00
> // end_date = 2016-06-16 16:00:00
> // schedule_interval = timedelta(minutes=1)
> // sla = timedelta(minutes=2)
> task_id | dag_id | execution_date | email_sent |
> timestamp | description | notification_sent
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
> sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16
> 15:58:11.832299 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16
> 15:59:09.663778 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16
> 16:00:13.651422 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16
> 16:01:08.576399 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16
> 16:02:08.523486 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16
> 16:03:08.538593 | | t
> (6 rows)
> {code}
> than before (current master branch) :
> {code}
> // start_date = 2016-06-16 15:40:00
> // end_date = 2016-06-16 15:45:00
> // schedule_interval = timedelta(minutes=1)
> // sla = timedelta(minutes=2)
> task_id | dag_id | execution_date | email_sent |
> timestamp | description | notification_sent
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
> sla_miss | dag_sla_miss1 | 2016-06-16 15:41:00 | t | 2016-06-16
> 15:44:30.305287 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:42:00 | t | 2016-06-16
> 15:45:35.372118 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:43:00 | t | 2016-06-16
> 15:46:30.415744 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:44:00 | t | 2016-06-16
> 15:47:30.507345 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:45:00 | t | 2016-06-16
> 15:48:30.487742 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:46:00 | t | 2016-06-16
> 15:50:40.647373 | | t
> sla_miss | dag_sla_miss1 | 2016-06-16 15:47:00 | t | 2016-06-16
> 15:50:40.647373 | | t
> {code}
> Please note that in this last case (current master) execution_date is equal
> to dag.following_schedule, so SLA is being fired after one extra
> schedule_interval. Also note that SLA are still being triggered after
> end_date. Also note the timestamp column being updated seveal time.
> Please tell me what do you think about my patch.
> dud
--
This message was sent by Atlassian Jira
(v8.3.4#803005)