This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 707bb0c [AIRFLOW-6535] Add AirflowFailException to fail without any
retry (#7133)
707bb0c is described below
commit 707bb0c725fbc32929eea162993aa8fb9854fa9a
Author: Jonathan Stern <[email protected]>
AuthorDate: Sat May 16 12:53:12 2020 -0500
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133)
* use preferred boolean check idiom
Co-Authored-By: Jarek Potiuk <[email protected]>
* add test coverage for AirflowFailException
* add docs for some exception usage patterns
* autoformatting
* remove extraneous newline, poke travis build
* clean up TaskInstance.handle_failure
Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.
* let sphinx generate docs for exceptions module
* keep session kwarg last in handle_failure
* explain allowed_top_level
* add black-box tests for retry/fail immediately cases
* don't lose safety measures in logging date attrs
* fix flake8 too few blank lines
* grammar nitpick
* add import to AirflowFailException example
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow/exceptions.py | 4 ++
airflow/models/taskinstance.py | 112 +++++++++++++++++++-------------------
docs/autoapi_templates/index.rst | 13 ++++-
docs/concepts.rst | 45 +++++++++++++++
docs/conf.py | 6 +-
tests/models/test_taskinstance.py | 58 +++++++++++++++++++-
6 files changed, 178 insertions(+), 60 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 6634d55..f0559a8 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -79,6 +79,10 @@ class AirflowSkipException(AirflowException):
"""Raise when the task should be skipped"""
+class AirflowFailException(AirflowException):
+ """Raise when the task should be failed without retrying"""
+
+
class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in Dag definition"""
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 3260580..2a89713 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -41,7 +41,8 @@ from sqlalchemy.sql.elements import BooleanClauseList
from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import (
- AirflowException, AirflowRescheduleException, AirflowSkipException,
AirflowTaskTimeout,
+ AirflowException, AirflowFailException, AirflowRescheduleException,
AirflowSkipException,
+ AirflowTaskTimeout,
)
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.models.log import Log
@@ -1067,6 +1068,10 @@ class TaskInstance(Base, LoggingMixin):
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception,
test_mode, context)
return
+ except AirflowFailException as e:
+ self.refresh_from_db()
+ self.handle_failure(e, test_mode, context, force_fail=True)
+ raise
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
@@ -1177,7 +1182,7 @@ class TaskInstance(Base, LoggingMixin):
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')
@provide_session
- def handle_failure(self, error, test_mode=None, context=None,
session=None):
+ def handle_failure(self, error, test_mode=None, context=None,
force_fail=False, session=None):
if test_mode is None:
test_mode = self.test_mode
if context is None:
@@ -1198,64 +1203,51 @@ class TaskInstance(Base, LoggingMixin):
if context is not None:
context['exception'] = error
- # Let's go deeper
- try:
- # Since this function is called only when the TaskInstance state
is running,
- # try_number contains the current try_number (not the next). We
- # only mark task instance as FAILED if the next task instance
- # try_number exceeds the max_tries.
- if self.is_eligible_to_retry():
- self.state = State.UP_FOR_RETRY
- self.log.info('Marking task as UP_FOR_RETRY')
- if task.email_on_retry and task.email:
- self.email_alert(error)
+ # Set state correctly and figure out how to log it,
+ # what callback to call if any, and how to decide whether to email
+
+ # Since this function is called only when the TaskInstance state is
running,
+ # try_number contains the current try_number (not the next). We
+ # only mark task instance as FAILED if the next task instance
+ # try_number exceeds the max_tries ... or if force_fail is truthy
+
+ if force_fail or not self.is_eligible_to_retry():
+ self.state = State.FAILED
+ if force_fail:
+ log_message = "Immediate failure requested. Marking task as
FAILED."
else:
- self.state = State.FAILED
- if task.retries:
- self.log.info(
- 'All retries failed; marking task as FAILED.'
- 'dag_id=%s, task_id=%s, execution_date=%s,
start_date=%s, end_date=%s',
- self.dag_id,
- self.task_id,
- self.execution_date.strftime('%Y%m%dT%H%M%S') if
hasattr(
- self,
- 'execution_date') and self.execution_date else '',
- self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
- self,
- 'start_date') and self.start_date else '',
- self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
- self,
- 'end_date') and self.end_date else '')
- else:
- self.log.info(
- 'Marking task as FAILED.'
- 'dag_id=%s, task_id=%s, execution_date=%s,
start_date=%s, end_date=%s',
- self.dag_id,
- self.task_id,
- self.execution_date.strftime('%Y%m%dT%H%M%S') if
hasattr(
- self,
- 'execution_date') and self.execution_date else '',
- self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
- self,
- 'start_date') and self.start_date else '',
- self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
- self,
- 'end_date') and self.end_date else '')
- if task.email_on_failure and task.email:
- self.email_alert(error)
- except Exception as e2:
- self.log.error('Failed to send email to: %s', task.email)
- self.log.exception(e2)
+ log_message = "Marking task as FAILED."
+ email_for_state = task.email_on_failure
+ callback = task.on_failure_callback
+ else:
+ self.state = State.UP_FOR_RETRY
+ log_message = "Marking task as UP_FOR_RETRY."
+ email_for_state = task.email_on_retry
+ callback = task.on_retry_callback
+
+ self.log.info(
+ '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s,
end_date=%s',
+ log_message,
+ self.dag_id,
+ self.task_id,
+ self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
+ self._safe_date('start_date', '%Y%m%dT%H%M%S'),
+ self._safe_date('end_date', '%Y%m%dT%H%M%S')
+ )
+ if email_for_state and task.email:
+ try:
+ self.email_alert(error)
+ except Exception as e2:
+ self.log.error('Failed to send email to: %s', task.email)
+ self.log.exception(e2)
# Handling callbacks pessimistically
- try:
- if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
- task.on_retry_callback(context)
- if self.state == State.FAILED and task.on_failure_callback:
- task.on_failure_callback(context)
- except Exception as e3:
- self.log.error("Failed at executing callback")
- self.log.exception(e3)
+ if callback:
+ try:
+ callback(context)
+ except Exception as e3:
+ self.log.error("Failed at executing callback")
+ self.log.exception(e3)
if not test_mode:
session.merge(self)
@@ -1265,6 +1257,12 @@ class TaskInstance(Base, LoggingMixin):
"""Is task instance is eligible for retry"""
return self.task.retries and self.try_number <= self.max_tries
+ def _safe_date(self, date_attr, fmt):
+ result = getattr(self, date_attr, None)
+ if result is not None:
+ return result.strftime(fmt)
+ return ''
+
@provide_session
def get_template_context(self, session=None) -> Dict[str, Any]:
task = self.task
diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst
index d3513b7..ed1a910 100644
--- a/docs/autoapi_templates/index.rst
+++ b/docs/autoapi_templates/index.rst
@@ -357,10 +357,21 @@ persisted in the database.
airflow/models/index
+.. _pythonapi:exceptions:
+
+Exceptions
+----------
+
+.. toctree::
+ :includehidden:
+ :glob:
+ :maxdepth: 1
+
+ airflow/exceptions/index
Secrets Backends
----------------
-Airflow uses relies on secrets backends to retrieve
:class:`~airflow.models.connection.Connection` objects.
+Airflow relies on secrets backends to retrieve
:class:`~airflow.models.connection.Connection` objects.
All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.
.. toctree::
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 60ceec6..c8d5942 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -1216,6 +1216,51 @@ template string:
See `Jinja documentation
<https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
to find all available options.
+.. _exceptions:
+
+Exceptions
+==========
+
+Airflow defines a number of exceptions; most of these are used internally, but
a few
+are relevant to authors of custom operators or python callables called from
``PythonOperator``
+tasks. Normally any exception raised from an ``execute`` method or python
callable will either
+cause a task instance to fail if it is not configured to retry or has reached
its limit on
+retry attempts, or to be marked as "up for retry". A few exceptions can be
used when different
+behavior is desired:
+
+* ``AirflowSkipException`` can be raised to set the state of the current task
instance to "skipped"
+* ``AirflowFailException`` can be raised to set the state of the current task
to "failed" regardless
+ of whether there are any retry attempts remaining.
+
+This example illustrates some possibilities
+
+.. code:: python
+
+ from airflow.exceptions import AirflowFailException, AirflowSkipException
+
+ def fetch_data():
+ try:
+ data = get_some_data(get_api_key())
+ if not data:
+ # Set state to skipped and do not retry
+ # Downstream task behavior will be determined by trigger rules
+ raise AirflowSkipException("No data available.")
+ except Unauthorized:
+ # If we retry, our api key will still be bad, so don't waste time
retrying!
+ # Set state to failed and move on
+ raise AirflowFailException("Our api key is bad!")
+ except TransientError:
+ print("Looks like there was a blip.")
+ # Raise the exception and let the task retry unless max attempts
were reached
+ raise
+ handle(data)
+
+ task = PythonOperator(task_id="fetch_data", python_callable=fetch_data,
retries=10)
+
+.. seealso::
+ - :ref:`List of Airflow exceptions <pythonapi:exceptions>`
+
+
Packaged DAGs
'''''''''''''
While often you will specify DAGs in a single ``.py`` file it might sometimes
diff --git a/docs/conf.py b/docs/conf.py
index d69fd91..95d2f5e 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -211,9 +211,13 @@ exclude_patterns: List[str] = [
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
# Generate top-level
+
+# do not exclude these top-level modules from the doc build:
+allowed_top_level = ("exceptions.py",)
+
for path in glob(f"{ROOT_DIR}/airflow/*"):
name = os.path.basename(path)
- if os.path.isfile(path):
+ if os.path.isfile(path) and not path.endswith(allowed_top_level):
exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}")
browsable_packages = ["operators", "hooks", "sensors", "providers",
"executors", "models", "secrets"]
if os.path.isdir(path) and name not in browsable_packages:
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 631769e..52feaa5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -31,7 +31,7 @@ from parameterized import param, parameterized
from sqlalchemy.orm.session import Session
from airflow import models, settings
-from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.exceptions import AirflowException, AirflowFailException,
AirflowSkipException
from airflow.models import (
DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as
TI, TaskReschedule, Variable,
)
@@ -1514,6 +1514,62 @@ class TestTaskInstance(unittest.TestCase):
context_arg_2 = mock_on_retry_2.call_args[0][0]
assert context_arg_2 and "task_instance" in context_arg_2
+ # test the scenario where normally we would retry but have been asked
to fail
+ mock_on_failure_3 = mock.MagicMock()
+ mock_on_retry_3 = mock.MagicMock()
+ task3 = DummyOperator(task_id="test_handle_failure_on_force_fail",
+ on_failure_callback=mock_on_failure_3,
+ on_retry_callback=mock_on_retry_3,
+ retries=1,
+ dag=dag)
+ ti3 = TI(task=task3, execution_date=start_date)
+ ti3.state = State.FAILED
+ ti3.handle_failure("test force_fail handling", force_fail=True)
+
+ context_arg_3 = mock_on_failure_3.call_args[0][0]
+ assert context_arg_3 and "task_instance" in context_arg_3
+ mock_on_retry_3.assert_not_called()
+
+ def test_does_not_retry_on_airflow_fail_exception(self):
+ def fail():
+ raise AirflowFailException("hopeless")
+
+ dag =
models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception')
+ task = PythonOperator(
+ task_id='test_raise_airflow_fail_exception',
+ dag=dag,
+ python_callable=fail,
+ owner='airflow',
+ start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+ retries=1
+ )
+ ti = TI(task=task, execution_date=timezone.utcnow())
+ try:
+ ti.run()
+ except AirflowFailException:
+ pass # expected
+ self.assertEqual(State.FAILED, ti.state)
+
+ def test_retries_on_other_exceptions(self):
+ def fail():
+ raise AirflowException("maybe this will pass?")
+
+ dag = models.DAG(dag_id='test_retries_on_other_exceptions')
+ task = PythonOperator(
+ task_id='test_raise_other_exception',
+ dag=dag,
+ python_callable=fail,
+ owner='airflow',
+ start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
+ retries=1
+ )
+ ti = TI(task=task, execution_date=timezone.utcnow())
+ try:
+ ti.run()
+ except AirflowException:
+ pass # expected
+ self.assertEqual(State.UP_FOR_RETRY, ti.state)
+
def _env_var_check_callback(self):
self.assertEqual('test_echo_env_variables',
os.environ['AIRFLOW_CTX_DAG_ID'])
self.assertEqual('hive_in_python_op',
os.environ['AIRFLOW_CTX_TASK_ID'])