[ 
https://issues.apache.org/jira/browse/AIRFLOW-6184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995695#comment-16995695
 ] 

Dan Davydov commented on AIRFLOW-6184:
--------------------------------------

This might not be necessary, using the existing on_failure/on_success callbacks 
might be satisfactory, e.g. we can add a utility function like below (credit to 
this ticket's reporter for writing it):
{code:java}
def email_on_failure_callback(emails, retries=3, wait_seconds=1):
  """Callback generator to send emails on DAG failure. Designed to be used with 
`on_failure_callback`.
  To avoid blocking the scheduler, exceptions are catched and logged. To see 
the logs access the
 Args:
      emails (list,str): list of emails to send the email to (comma or 
semicolon delimited)
      retries (int): Number of times to try to send the email.
      wait_seconds (int): Seconds to wait between retries.
Returns:
          Callback function to send email to specified email. Templated email 
includes important information and links to the instance.
  """
    @_catch_exception
    @retry(
    stop=stop_after_attempt(retries),
    wait=wait_fixed(wait_seconds),
    before=before_log(LOGGER, logging.DEBUG),
    after=after_log(LOGGER, logging.DEBUG)
    def notify_failure(context):
      LOGGER.info('Attempting to send email notification for failure to 
{}'.format(emails))
      iso = quote(context.get('execution_date').isoformat())
      base_url = context.get('conf').get('webserver', 'BASE_URL')    
context.update({'dag_url': base_url + 
'/admin/airflow/graph?execution_date={iso}&dag_id={dag_id}'.format(
      iso=iso, dag_id=context.get('dag_run').dag_id
      )})
      html_content = """
      DAG {dag_run.dag_id} has failed<br>
      <br>
      Execution date: {execution_date}<br>
      First task instance log: <a href="{ti.log_url}">Log Link</a><br>
      <br>
      <a href="{dag_url}">Open failed DAGrun</a><br>
      """.format(**context)
    subject = 'Airflow alert: {dag_run.dag_id}'.format(**context)
    email.send_email(emails, subject, html_content)

 {code}
And then users can just do the following in a DAG constructor:
{code:java}
 with DAG('monitoring_example', 
on_failure_callback=email_on_failure_callback('[email protected]')) as dag: 
{code}
 

But https://issues.apache.org/jira/browse/AIRFLOW-6253 should be resolved first 
so that the callbacks don't affect scheduler performance

> Add `email_on_retry` and `email_on_failure` for DAGs
> ----------------------------------------------------
>
>                 Key: AIRFLOW-6184
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6184
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: DagRun, scheduler
>    Affects Versions: 1.10.7
>            Reporter: Gerard Casas Saez
>            Priority: Minor
>
> Currently, there's support for  `email_on_retry` and `email_on_failure` on 
> operators.
> As a user I would like to have an easy way to be notified if the DAG is 
> marked as failed. As I may want to get paged if so. 
>  
> Adding a notification operator may fail if the workers are not getting jobs 
> scheduled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to