This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 5e94fe9  [AIRFLOW-6535] Add AirflowFailException to fail without any 
retry (#7133)
5e94fe9 is described below

commit 5e94fe9732e31c8ff1af44d2feff0d0f33bcbe25
Author: Jonathan Stern <jonathan.st...@iprospect.com>
AuthorDate: Sat May 16 12:53:12 2020 -0500

    [AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133)
    
    * use preferred boolean check idiom
    
    Co-Authored-By: Jarek Potiuk <ja...@potiuk.com>
    
    * add test coverage for AirflowFailException
    
    * add docs for some exception usage patterns
    
    * autoformatting
    
    * remove extraneous newline, poke travis build
    
    * clean up TaskInstance.handle_failure
    
    Try to reduce nesting and repetition of logic for different conditions.
    Also try to tighten up the scope of the exception handling ... it looks
    like the large block that catches an Exception and logs it as a failure
    to send an email may have been swallowing some TypeErrors coming out
    of trying to compose a log info message and calling strftime on
    start_date and end_date when they're set to None; this is why I've added
    lines in the test to set those values on the TaskInstance objects.
    
    * let sphinx generate docs for exceptions module
    
    * keep session kwarg last in handle_failure
    
    * explain allowed_top_level
    
    * add black-box tests for retry/fail immediately cases
    
    * don't lose safety measures in logging date attrs
    
    * fix flake8 too few blank lines
    
    * grammar nitpick
    
    * add import to AirflowFailException example
    
    Co-authored-by: Jarek Potiuk <ja...@potiuk.com>
    (cherry picked from commit 707bb0c725fbc32929eea162993aa8fb9854fa9a)
---
 airflow/exceptions.py             |   4 ++
 airflow/models/taskinstance.py    | 112 +++++++++++++++++++-------------------
 docs/autoapi_templates/index.rst  |  13 ++++-
 docs/concepts.rst                 |  45 +++++++++++++++
 docs/conf.py                      |   1 -
 tests/models/test_taskinstance.py |  58 +++++++++++++++++++-
 6 files changed, 173 insertions(+), 60 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 10d310f..badf156 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -75,6 +75,10 @@ class AirflowSkipException(AirflowException):
     """Raise when the task should be skipped"""
 
 
+class AirflowFailException(AirflowException):
+    """Raise when the task should be failed without retrying"""
+
+
 class AirflowDagCycleException(AirflowException):
     """Raise when there is a cycle in Dag definition"""
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 242cfe3..0e5d6e3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -43,7 +43,8 @@ from sqlalchemy.orm.session import Session
 from airflow import settings
 from airflow.configuration import conf
 from airflow.exceptions import (
-    AirflowException, AirflowTaskTimeout, AirflowSkipException, 
AirflowRescheduleException
+    AirflowException, AirflowFailException, AirflowRescheduleException, 
AirflowSkipException,
+    AirflowTaskTimeout,
 )
 from airflow.models.base import Base, ID_LEN
 from airflow.models.log import Log
@@ -1026,6 +1027,10 @@ class TaskInstance(Base, LoggingMixin):
             self.refresh_from_db()
             self._handle_reschedule(actual_start_date, reschedule_exception, 
test_mode, context)
             return
+        except AirflowFailException as e:
+            self.refresh_from_db()
+            self.handle_failure(e, test_mode, context, force_fail=True)
+            raise
         except AirflowException as e:
             self.refresh_from_db()
             # for case when task is marked as success/failed externally
@@ -1136,7 +1141,7 @@ class TaskInstance(Base, LoggingMixin):
         self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
 
     @provide_session
-    def handle_failure(self, error, test_mode=None, context=None, 
session=None):
+    def handle_failure(self, error, test_mode=None, context=None, 
force_fail=False, session=None):
         if test_mode is None:
             test_mode = self.test_mode
         if context is None:
@@ -1157,64 +1162,51 @@ class TaskInstance(Base, LoggingMixin):
         if context is not None:
             context['exception'] = error
 
-        # Let's go deeper
-        try:
-            # Since this function is called only when the TI state is running,
-            # try_number contains the current try_number (not the next). We
-            # only mark task instance as FAILED if the next task instance
-            # try_number exceeds the max_tries.
-            if self.is_eligible_to_retry():
-                self.state = State.UP_FOR_RETRY
-                self.log.info('Marking task as UP_FOR_RETRY')
-                if task.email_on_retry and task.email:
-                    self.email_alert(error)
+        # Set state correctly and figure out how to log it,
+        # what callback to call if any, and how to decide whether to email
+
+        # Since this function is called only when the TaskInstance state is 
running,
+        # try_number contains the current try_number (not the next). We
+        # only mark task instance as FAILED if the next task instance
+        # try_number exceeds the max_tries ... or if force_fail is truthy
+
+        if force_fail or not self.is_eligible_to_retry():
+            self.state = State.FAILED
+            if force_fail:
+                log_message = "Immediate failure requested. Marking task as 
FAILED."
             else:
-                self.state = State.FAILED
-                if task.retries:
-                    self.log.info(
-                        'All retries failed; marking task as FAILED.'
-                        'dag_id=%s, task_id=%s, execution_date=%s, 
start_date=%s, end_date=%s',
-                        self.dag_id,
-                        self.task_id,
-                        self.execution_date.strftime('%Y%m%dT%H%M%S') if 
hasattr(
-                            self,
-                            'execution_date') and self.execution_date else '',
-                        self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'start_date') and self.start_date else '',
-                        self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'end_date') and self.end_date else '')
-                else:
-                    self.log.info(
-                        'Marking task as FAILED.'
-                        'dag_id=%s, task_id=%s, execution_date=%s, 
start_date=%s, end_date=%s',
-                        self.dag_id,
-                        self.task_id,
-                        self.execution_date.strftime('%Y%m%dT%H%M%S') if 
hasattr(
-                            self,
-                            'execution_date') and self.execution_date else '',
-                        self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'start_date') and self.start_date else '',
-                        self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'end_date') and self.end_date else '')
-                if task.email_on_failure and task.email:
-                    self.email_alert(error)
-        except Exception as e2:
-            self.log.error('Failed to send email to: %s', task.email)
-            self.log.exception(e2)
+                log_message = "Marking task as FAILED."
+            email_for_state = task.email_on_failure
+            callback = task.on_failure_callback
+        else:
+            self.state = State.UP_FOR_RETRY
+            log_message = "Marking task as UP_FOR_RETRY."
+            email_for_state = task.email_on_retry
+            callback = task.on_retry_callback
+
+        self.log.info(
+            '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, 
end_date=%s',
+            log_message,
+            self.dag_id,
+            self.task_id,
+            self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
+            self._safe_date('start_date', '%Y%m%dT%H%M%S'),
+            self._safe_date('end_date', '%Y%m%dT%H%M%S')
+        )
+        if email_for_state and task.email:
+            try:
+                self.email_alert(error)
+            except Exception as e2:
+                self.log.error('Failed to send email to: %s', task.email)
+                self.log.exception(e2)
 
         # Handling callbacks pessimistically
-        try:
-            if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
-                task.on_retry_callback(context)
-            if self.state == State.FAILED and task.on_failure_callback:
-                task.on_failure_callback(context)
-        except Exception as e3:
-            self.log.error("Failed at executing callback")
-            self.log.exception(e3)
+        if callback:
+            try:
+                callback(context)
+            except Exception as e3:
+                self.log.error("Failed at executing callback")
+                self.log.exception(e3)
 
         if not test_mode:
             session.merge(self)
@@ -1224,6 +1216,12 @@ class TaskInstance(Base, LoggingMixin):
         """Is task instance is eligible for retry"""
         return self.task.retries and self.try_number <= self.max_tries
 
+    def _safe_date(self, date_attr, fmt):
+        result = getattr(self, date_attr, None)
+        if result is not None:
+            return result.strftime(fmt)
+        return ''
+
     @provide_session
     def get_template_context(self, session=None):
         task = self.task
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index d7f7a47..1be7ee7 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -119,6 +119,17 @@ persisted in the database.
 
   airflow/models/index
 
+.. _pythonapi:exceptions:
+
+Exceptions
+----------
+
+.. toctree::
+  :includehidden:
+  :glob:
+  :maxdepth: 1
+
+  airflow/exceptions/index
 
 Core and community package
 --------------------------
@@ -131,7 +142,7 @@ added only to the contrib package.
 
 Secrets Backends
 ----------------
-Airflow uses relies on secrets backends to retrieve 
:class:`~airflow.models.connection.Connection` objects.
+Airflow relies on secrets backends to retrieve 
:class:`~airflow.models.connection.Connection` objects.
 All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.
 
 .. toctree::
diff --git a/docs/concepts.rst b/docs/concepts.rst
index b365aba..edd6e5d 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1223,6 +1223,51 @@ template string:
 See `Jinja documentation 
<https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
 to find all available options.
 
+.. _exceptions:
+
+Exceptions
+==========
+
+Airflow defines a number of exceptions; most of these are used internally, but 
a few
+are relevant to authors of custom operators or python callables called from 
``PythonOperator``
+tasks. Normally any exception raised from an ``execute`` method or python 
callable will either
+cause a task instance to fail if it is not configured to retry or has reached 
its limit on
+retry attempts, or to be marked as "up for retry". A few exceptions can be 
used when different
+behavior is desired:
+
+* ``AirflowSkipException`` can be raised to set the state of the current task 
instance to "skipped"
+* ``AirflowFailException`` can be raised to set the state of the current task 
to "failed" regardless
+  of whether there are any retry attempts remaining.
+
+This example illustrates some possibilities
+
+.. code:: python
+
+  from airflow.exceptions import AirflowFailException, AirflowSkipException
+
+  def fetch_data():
+      try:
+          data = get_some_data(get_api_key())
+          if not data:
+              # Set state to skipped and do not retry
+              # Downstream task behavior will be determined by trigger rules
+              raise AirflowSkipException("No data available.")
+      except Unauthorized:
+          # If we retry, our api key will still be bad, so don't waste time 
retrying!
+          # Set state to failed and move on
+          raise AirflowFailException("Our api key is bad!")
+      except TransientError:
+          print("Looks like there was a blip.")
+          # Raise the exception and let the task retry unless max attempts 
were reached
+          raise
+      handle(data)
+
+  task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, 
retries=10)
+
+.. seealso::
+    - :ref:`List of Airflow exceptions <pythonapi:exceptions>`
+
+
 Packaged DAGs
 '''''''''''''
 While often you will specify DAGs in a single ``.py`` file it might sometimes
diff --git a/docs/conf.py b/docs/conf.py
index 67010b3..5f2a113 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -198,7 +198,6 @@ exclude_patterns = [
     '_api/airflow/dag',
     '_api/airflow/default_login',
     '_api/airflow/example_dags',
-    '_api/airflow/exceptions',
     '_api/airflow/index.rst',
     '_api/airflow/jobs',
     '_api/airflow/lineage',
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 4534a07..0c416a4 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -31,7 +31,7 @@ from sqlalchemy.orm.session import Session
 from airflow import models, settings
 from airflow.configuration import conf
 from airflow.contrib.sensors.python_sensor import PythonSensor
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowFailException, 
AirflowSkipException
 from airflow.models import (
     DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as 
TI, TaskReschedule, Variable,
 )
@@ -1408,6 +1408,22 @@ class TaskInstanceTest(unittest.TestCase):
         context_arg_2 = mock_on_retry_2.call_args[0][0]
         assert context_arg_2 and "task_instance" in context_arg_2
 
+        # test the scenario where normally we would retry but have been asked 
to fail
+        mock_on_failure_3 = mock.MagicMock()
+        mock_on_retry_3 = mock.MagicMock()
+        task3 = DummyOperator(task_id="test_handle_failure_on_force_fail",
+                              on_failure_callback=mock_on_failure_3,
+                              on_retry_callback=mock_on_retry_3,
+                              retries=1,
+                              dag=dag)
+        ti3 = TI(task=task3, execution_date=start_date)
+        ti3.state = State.FAILED
+        ti3.handle_failure("test force_fail handling", force_fail=True)
+
+        context_arg_3 = mock_on_failure_3.call_args[0][0]
+        assert context_arg_3 and "task_instance" in context_arg_3
+        mock_on_retry_3.assert_not_called()
+
     @parameterized.expand(
         [
             ('{{ var.value.a_variable }}', 'a test value'),
@@ -1474,6 +1490,46 @@ class TaskInstanceTest(unittest.TestCase):
         with self.assertRaises(KeyError):
             task.render_template('{{ var.json.get("missing_variable") }}', 
context)
 
+    def test_does_not_retry_on_airflow_fail_exception(self):
+        def fail():
+            raise AirflowFailException("hopeless")
+
+        dag = 
models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception')
+        task = PythonOperator(
+            task_id='test_raise_airflow_fail_exception',
+            dag=dag,
+            python_callable=fail,
+            owner='airflow',
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+            retries=1
+        )
+        ti = TI(task=task, execution_date=timezone.utcnow())
+        try:
+            ti.run()
+        except AirflowFailException:
+            pass  # expected
+        self.assertEqual(State.FAILED, ti.state)
+
+    def test_retries_on_other_exceptions(self):
+        def fail():
+            raise AirflowException("maybe this will pass?")
+
+        dag = models.DAG(dag_id='test_retries_on_other_exceptions')
+        task = PythonOperator(
+            task_id='test_raise_other_exception',
+            dag=dag,
+            python_callable=fail,
+            owner='airflow',
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+            retries=1
+        )
+        ti = TI(task=task, execution_date=timezone.utcnow())
+        try:
+            ti.run()
+        except AirflowException:
+            pass  # expected
+        self.assertEqual(State.UP_FOR_RETRY, ti.state)
+
     @parameterized.expand([
         (True, ),
         (False, )

Reply via email to