This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 707bb0c  [AIRFLOW-6535] Add AirflowFailException to fail without any 
retry (#7133)
707bb0c is described below

commit 707bb0c725fbc32929eea162993aa8fb9854fa9a
Author: Jonathan Stern <[email protected]>
AuthorDate: Sat May 16 12:53:12 2020 -0500

    [AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133)
    
    
    
    * use preferred boolean check idiom
    
    Co-Authored-By: Jarek Potiuk <[email protected]>
    
    * add test coverage for AirflowFailException
    
    * add docs for some exception usage patterns
    
    * autoformatting
    
    * remove extraneous newline, poke travis build
    
    * clean up TaskInstance.handle_failure
    
    Try to reduce nesting and repetition of logic for different conditions.
    Also try to tighten up the scope of the exception handling ... it looks
    like the large block that catches an Exception and logs it as a failure
    to send an email may have been swallowing some TypeErrors coming out
    of trying to compose a log info message and calling strftime on
    start_date and end_date when they're set to None; this is why I've added
    lines in the test to set those values on the TaskInstance objects.
    
    * let sphinx generate docs for exceptions module
    
    * keep session kwarg last in handle_failure
    
    * explain allowed_top_level
    
    * add black-box tests for retry/fail immediately cases
    
    * don't lose safety measures in logging date attrs
    
    * fix flake8 too few blank lines
    
    * grammar nitpick
    
    * add import to AirflowFailException example
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow/exceptions.py             |   4 ++
 airflow/models/taskinstance.py    | 112 +++++++++++++++++++-------------------
 docs/autoapi_templates/index.rst  |  13 ++++-
 docs/concepts.rst                 |  45 +++++++++++++++
 docs/conf.py                      |   6 +-
 tests/models/test_taskinstance.py |  58 +++++++++++++++++++-
 6 files changed, 178 insertions(+), 60 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 6634d55..f0559a8 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -79,6 +79,10 @@ class AirflowSkipException(AirflowException):
     """Raise when the task should be skipped"""
 
 
+class AirflowFailException(AirflowException):
+    """Raise when the task should be failed without retrying"""
+
+
 class AirflowDagCycleException(AirflowException):
     """Raise when there is a cycle in Dag definition"""
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3260580..2a89713 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -41,7 +41,8 @@ from sqlalchemy.sql.elements import BooleanClauseList
 from airflow import settings
 from airflow.configuration import conf
 from airflow.exceptions import (
-    AirflowException, AirflowRescheduleException, AirflowSkipException, 
AirflowTaskTimeout,
+    AirflowException, AirflowFailException, AirflowRescheduleException, 
AirflowSkipException,
+    AirflowTaskTimeout,
 )
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
 from airflow.models.log import Log
@@ -1067,6 +1068,10 @@ class TaskInstance(Base, LoggingMixin):
             self.refresh_from_db()
             self._handle_reschedule(actual_start_date, reschedule_exception, 
test_mode, context)
             return
+        except AirflowFailException as e:
+            self.refresh_from_db()
+            self.handle_failure(e, test_mode, context, force_fail=True)
+            raise
         except AirflowException as e:
             self.refresh_from_db()
             # for case when task is marked as success/failed externally
@@ -1177,7 +1182,7 @@ class TaskInstance(Base, LoggingMixin):
         self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
 
     @provide_session
-    def handle_failure(self, error, test_mode=None, context=None, 
session=None):
+    def handle_failure(self, error, test_mode=None, context=None, 
force_fail=False, session=None):
         if test_mode is None:
             test_mode = self.test_mode
         if context is None:
@@ -1198,64 +1203,51 @@ class TaskInstance(Base, LoggingMixin):
         if context is not None:
             context['exception'] = error
 
-        # Let's go deeper
-        try:
-            # Since this function is called only when the TaskInstance state 
is running,
-            # try_number contains the current try_number (not the next). We
-            # only mark task instance as FAILED if the next task instance
-            # try_number exceeds the max_tries.
-            if self.is_eligible_to_retry():
-                self.state = State.UP_FOR_RETRY
-                self.log.info('Marking task as UP_FOR_RETRY')
-                if task.email_on_retry and task.email:
-                    self.email_alert(error)
+        # Set state correctly and figure out how to log it,
+        # what callback to call if any, and how to decide whether to email
+
+        # Since this function is called only when the TaskInstance state is 
running,
+        # try_number contains the current try_number (not the next). We
+        # only mark task instance as FAILED if the next task instance
+        # try_number exceeds the max_tries ... or if force_fail is truthy
+
+        if force_fail or not self.is_eligible_to_retry():
+            self.state = State.FAILED
+            if force_fail:
+                log_message = "Immediate failure requested. Marking task as 
FAILED."
             else:
-                self.state = State.FAILED
-                if task.retries:
-                    self.log.info(
-                        'All retries failed; marking task as FAILED.'
-                        'dag_id=%s, task_id=%s, execution_date=%s, 
start_date=%s, end_date=%s',
-                        self.dag_id,
-                        self.task_id,
-                        self.execution_date.strftime('%Y%m%dT%H%M%S') if 
hasattr(
-                            self,
-                            'execution_date') and self.execution_date else '',
-                        self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'start_date') and self.start_date else '',
-                        self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'end_date') and self.end_date else '')
-                else:
-                    self.log.info(
-                        'Marking task as FAILED.'
-                        'dag_id=%s, task_id=%s, execution_date=%s, 
start_date=%s, end_date=%s',
-                        self.dag_id,
-                        self.task_id,
-                        self.execution_date.strftime('%Y%m%dT%H%M%S') if 
hasattr(
-                            self,
-                            'execution_date') and self.execution_date else '',
-                        self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'start_date') and self.start_date else '',
-                        self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
-                            self,
-                            'end_date') and self.end_date else '')
-                if task.email_on_failure and task.email:
-                    self.email_alert(error)
-        except Exception as e2:
-            self.log.error('Failed to send email to: %s', task.email)
-            self.log.exception(e2)
+                log_message = "Marking task as FAILED."
+            email_for_state = task.email_on_failure
+            callback = task.on_failure_callback
+        else:
+            self.state = State.UP_FOR_RETRY
+            log_message = "Marking task as UP_FOR_RETRY."
+            email_for_state = task.email_on_retry
+            callback = task.on_retry_callback
+
+        self.log.info(
+            '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, 
end_date=%s',
+            log_message,
+            self.dag_id,
+            self.task_id,
+            self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
+            self._safe_date('start_date', '%Y%m%dT%H%M%S'),
+            self._safe_date('end_date', '%Y%m%dT%H%M%S')
+        )
+        if email_for_state and task.email:
+            try:
+                self.email_alert(error)
+            except Exception as e2:
+                self.log.error('Failed to send email to: %s', task.email)
+                self.log.exception(e2)
 
         # Handling callbacks pessimistically
-        try:
-            if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
-                task.on_retry_callback(context)
-            if self.state == State.FAILED and task.on_failure_callback:
-                task.on_failure_callback(context)
-        except Exception as e3:
-            self.log.error("Failed at executing callback")
-            self.log.exception(e3)
+        if callback:
+            try:
+                callback(context)
+            except Exception as e3:
+                self.log.error("Failed at executing callback")
+                self.log.exception(e3)
 
         if not test_mode:
             session.merge(self)
@@ -1265,6 +1257,12 @@ class TaskInstance(Base, LoggingMixin):
         """Is task instance is eligible for retry"""
         return self.task.retries and self.try_number <= self.max_tries
 
+    def _safe_date(self, date_attr, fmt):
+        result = getattr(self, date_attr, None)
+        if result is not None:
+            return result.strftime(fmt)
+        return ''
+
     @provide_session
     def get_template_context(self, session=None) -> Dict[str, Any]:
         task = self.task
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index d3513b7..ed1a910 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -357,10 +357,21 @@ persisted in the database.
 
   airflow/models/index
 
+.. _pythonapi:exceptions:
+
+Exceptions
+----------
+
+.. toctree::
+  :includehidden:
+  :glob:
+  :maxdepth: 1
+
+  airflow/exceptions/index
 
 Secrets Backends
 ----------------
-Airflow uses relies on secrets backends to retrieve 
:class:`~airflow.models.connection.Connection` objects.
+Airflow relies on secrets backends to retrieve 
:class:`~airflow.models.connection.Connection` objects.
 All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.
 
 .. toctree::
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 60ceec6..c8d5942 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1216,6 +1216,51 @@ template string:
 See `Jinja documentation 
<https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
 to find all available options.
 
+.. _exceptions:
+
+Exceptions
+==========
+
+Airflow defines a number of exceptions; most of these are used internally, but 
a few
+are relevant to authors of custom operators or python callables called from 
``PythonOperator``
+tasks. Normally any exception raised from an ``execute`` method or python 
callable will either
+cause a task instance to fail if it is not configured to retry or has reached 
its limit on
+retry attempts, or to be marked as "up for retry". A few exceptions can be 
used when different
+behavior is desired:
+
+* ``AirflowSkipException`` can be raised to set the state of the current task 
instance to "skipped"
+* ``AirflowFailException`` can be raised to set the state of the current task 
to "failed" regardless
+  of whether there are any retry attempts remaining.
+
+This example illustrates some possibilities
+
+.. code:: python
+
+  from airflow.exceptions import AirflowFailException, AirflowSkipException
+
+  def fetch_data():
+      try:
+          data = get_some_data(get_api_key())
+          if not data:
+              # Set state to skipped and do not retry
+              # Downstream task behavior will be determined by trigger rules
+              raise AirflowSkipException("No data available.")
+      except Unauthorized:
+          # If we retry, our api key will still be bad, so don't waste time 
retrying!
+          # Set state to failed and move on
+          raise AirflowFailException("Our api key is bad!")
+      except TransientError:
+          print("Looks like there was a blip.")
+          # Raise the exception and let the task retry unless max attempts 
were reached
+          raise
+      handle(data)
+
+  task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, 
retries=10)
+
+.. seealso::
+    - :ref:`List of Airflow exceptions <pythonapi:exceptions>`
+
+
 Packaged DAGs
 '''''''''''''
 While often you will specify DAGs in a single ``.py`` file it might sometimes
diff --git a/docs/conf.py b/docs/conf.py
index d69fd91..95d2f5e 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -211,9 +211,13 @@ exclude_patterns: List[str] = [
 ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
 
 # Generate top-level
+
+# do not exclude these top-level modules from the doc build:
+allowed_top_level = ("exceptions.py",)
+
 for path in glob(f"{ROOT_DIR}/airflow/*"):
     name = os.path.basename(path)
-    if os.path.isfile(path):
+    if os.path.isfile(path) and not path.endswith(allowed_top_level):
         exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}")
     browsable_packages = ["operators", "hooks", "sensors", "providers", 
"executors", "models", "secrets"]
     if os.path.isdir(path) and name not in browsable_packages:
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 631769e..52feaa5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -31,7 +31,7 @@ from parameterized import param, parameterized
 from sqlalchemy.orm.session import Session
 
 from airflow import models, settings
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowFailException, 
AirflowSkipException
 from airflow.models import (
     DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as 
TI, TaskReschedule, Variable,
 )
@@ -1514,6 +1514,62 @@ class TestTaskInstance(unittest.TestCase):
         context_arg_2 = mock_on_retry_2.call_args[0][0]
         assert context_arg_2 and "task_instance" in context_arg_2
 
+        # test the scenario where normally we would retry but have been asked 
to fail
+        mock_on_failure_3 = mock.MagicMock()
+        mock_on_retry_3 = mock.MagicMock()
+        task3 = DummyOperator(task_id="test_handle_failure_on_force_fail",
+                              on_failure_callback=mock_on_failure_3,
+                              on_retry_callback=mock_on_retry_3,
+                              retries=1,
+                              dag=dag)
+        ti3 = TI(task=task3, execution_date=start_date)
+        ti3.state = State.FAILED
+        ti3.handle_failure("test force_fail handling", force_fail=True)
+
+        context_arg_3 = mock_on_failure_3.call_args[0][0]
+        assert context_arg_3 and "task_instance" in context_arg_3
+        mock_on_retry_3.assert_not_called()
+
+    def test_does_not_retry_on_airflow_fail_exception(self):
+        def fail():
+            raise AirflowFailException("hopeless")
+
+        dag = 
models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception')
+        task = PythonOperator(
+            task_id='test_raise_airflow_fail_exception',
+            dag=dag,
+            python_callable=fail,
+            owner='airflow',
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+            retries=1
+        )
+        ti = TI(task=task, execution_date=timezone.utcnow())
+        try:
+            ti.run()
+        except AirflowFailException:
+            pass  # expected
+        self.assertEqual(State.FAILED, ti.state)
+
+    def test_retries_on_other_exceptions(self):
+        def fail():
+            raise AirflowException("maybe this will pass?")
+
+        dag = models.DAG(dag_id='test_retries_on_other_exceptions')
+        task = PythonOperator(
+            task_id='test_raise_other_exception',
+            dag=dag,
+            python_callable=fail,
+            owner='airflow',
+            start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+            retries=1
+        )
+        ti = TI(task=task, execution_date=timezone.utcnow())
+        try:
+            ti.run()
+        except AirflowException:
+            pass  # expected
+        self.assertEqual(State.UP_FOR_RETRY, ti.state)
+
     def _env_var_check_callback(self):
         self.assertEqual('test_echo_env_variables', 
os.environ['AIRFLOW_CTX_DAG_ID'])
         self.assertEqual('hive_in_python_op', 
os.environ['AIRFLOW_CTX_TASK_ID'])

Reply via email to