[ 
https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097153#comment-17097153
 ] 

ASF GitHub Bot commented on AIRFLOW-249:
----------------------------------------

seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r418404338



##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       sure, that makes more sense

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       sure

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       sure

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       you are right, we don't. The original PR is written in a time when 
Airflow is still compatible with PY2. :) 

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = 
dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Refactor the SLA mechanism
> --------------------------
>
>                 Key: AIRFLOW-249
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-249
>             Project: Apache Airflow
>          Issue Type: Improvement
>            Reporter: dud
>            Priority: Major
>
> Hello
> I've noticed the SLA feature is currently behaving as follow :
> - it doesn't work on DAG scheduled @once or None because they have no 
> dag.followwing_schedule property
> - it keeps endlessly checking for SLA misses without ever worrying about any 
> end_date. Worse I noticed that emails are still being sent for runs that are 
> never happening because of end_date
> - it keeps checking for recent TIs even if SLA notification has been already 
> been sent for them
> - the SLA logic is only being fired after following_schedule + sla has 
> elapsed, in other words one has to wait for the next TI before having a 
> chance of getting any email. Also the email reports dag.following_schedule 
> time (I guess because it is close of TI.start_date), but unfortunately that 
> doesn't match what the task instances shows nor the log filename
> - the SLA logic is based on max(TI.execution_date) for the starting point of 
> its checks, that means that for a DAG whose SLA is longer than its schedule 
> period if half of the TIs are running longer than expected it will go 
> unnoticed. This could be demonstrated with a DAG like this one :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 6, 16, 12, 20),
>     'email': my_email
>     'sla': timedelta(minutes=2),
> }
> dag = DAG('unnoticed_sla', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
>     minute = kwargs['execution_date'].strftime("%M")
>     is_odd = int(minute) % 2
>     if is_odd:
>         sleep(300)
>     else:
>         sleep(10)
>     return True
> PythonOperator(
>     task_id='sla_miss',
>     python_callable=alternating_sleep,
>     provide_context=True,
>     dag=dag)
> {code}
> I've tried to rework the SLA triggering mechanism by addressing the above 
> points., please [have a look on 
> it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
> I made some tests with this patch :
> - the fluctuent DAG shown above no longer make Airflow skip any SLA event :
> {code}
>  task_id  |    dag_id     |   execution_date    | email_sent |         
> timestamp          | description | notification_sent 
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t          | 2016-06-16 
> 15:08:26.058631 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t          | 2016-06-16 
> 15:10:06.093253 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t          | 2016-06-16 
> 15:12:06.241773 |             | t
> {code}
> - on a normal DAG, the SLA is being triggred more quickly :
> {code}
> // start_date = 2016-06-16 15:55:00
> // end_date = 2016-06-16 16:00:00
> // schedule_interval =  timedelta(minutes=1)
> // sla = timedelta(minutes=2)
>  task_id  |    dag_id     |   execution_date    | email_sent |         
> timestamp          | description | notification_sent 
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t          | 2016-06-16 
> 15:58:11.832299 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t          | 2016-06-16 
> 15:59:09.663778 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t          | 2016-06-16 
> 16:00:13.651422 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t          | 2016-06-16 
> 16:01:08.576399 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t          | 2016-06-16 
> 16:02:08.523486 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t          | 2016-06-16 
> 16:03:08.538593 |             | t
> (6 rows)
> {code}
> than before (current master branch) :
> {code}
> // start_date = 2016-06-16 15:40:00
> // end_date = 2016-06-16 15:45:00
> // schedule_interval =  timedelta(minutes=1)
> // sla = timedelta(minutes=2)
>  task_id  |    dag_id     |   execution_date    | email_sent |         
> timestamp          | description | notification_sent 
> ----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:41:00 | t          | 2016-06-16 
> 15:44:30.305287 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:42:00 | t          | 2016-06-16 
> 15:45:35.372118 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:43:00 | t          | 2016-06-16 
> 15:46:30.415744 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:44:00 | t          | 2016-06-16 
> 15:47:30.507345 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:45:00 | t          | 2016-06-16 
> 15:48:30.487742 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:46:00 | t          | 2016-06-16 
> 15:50:40.647373 |             | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:47:00 | t          | 2016-06-16 
> 15:50:40.647373 |             | t
> {code}
> Please note that in this last case (current master) execution_date is equal 
> to dag.following_schedule, so SLA is being fired after one extra 
> schedule_interval. Also note that SLA are still being triggered after 
> end_date. Also note the timestamp column being updated seveal time.
> Please tell me what do you think about my patch.
> dud



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to