This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push: new 5e94fe9 [AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133) 5e94fe9 is described below commit 5e94fe9732e31c8ff1af44d2feff0d0f33bcbe25 Author: Jonathan Stern <jonathan.st...@iprospect.com> AuthorDate: Sat May 16 12:53:12 2020 -0500 [AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133) * use preferred boolean check idiom Co-Authored-By: Jarek Potiuk <ja...@potiuk.com> * add test coverage for AirflowFailException * add docs for some exception usage patterns * autoformatting * remove extraneous newline, poke travis build * clean up TaskInstance.handle_failure Try to reduce nesting and repetition of logic for different conditions. Also try to tighten up the scope of the exception handling ... it looks like the large block that catches an Exception and logs it as a failure to send an email may have been swallowing some TypeErrors coming out of trying to compose a log info message and calling strftime on start_date and end_date when they're set to None; this is why I've added lines in the test to set those values on the TaskInstance objects. * let sphinx generate docs for exceptions module * keep session kwarg last in handle_failure * explain allowed_top_level * add black-box tests for retry/fail immediately cases * don't lose safety measures in logging date attrs * fix flake8 too few blank lines * grammar nitpick * add import to AirflowFailException example Co-authored-by: Jarek Potiuk <ja...@potiuk.com> (cherry picked from commit 707bb0c725fbc32929eea162993aa8fb9854fa9a) --- airflow/exceptions.py | 4 ++ airflow/models/taskinstance.py | 112 +++++++++++++++++++------------------- docs/autoapi_templates/index.rst | 13 ++++- docs/concepts.rst | 45 +++++++++++++++ docs/conf.py | 1 - tests/models/test_taskinstance.py | 58 +++++++++++++++++++- 6 files changed, 173 insertions(+), 60 deletions(-) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 10d310f..badf156 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -75,6 +75,10 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped""" +class AirflowFailException(AirflowException): + """Raise when the task should be failed without retrying""" + + class AirflowDagCycleException(AirflowException): """Raise when there is a cycle in Dag definition""" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 242cfe3..0e5d6e3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -43,7 +43,8 @@ from sqlalchemy.orm.session import Session from airflow import settings from airflow.configuration import conf from airflow.exceptions import ( - AirflowException, AirflowTaskTimeout, AirflowSkipException, AirflowRescheduleException + AirflowException, AirflowFailException, AirflowRescheduleException, AirflowSkipException, + AirflowTaskTimeout, ) from airflow.models.base import Base, ID_LEN from airflow.models.log import Log @@ -1026,6 +1027,10 @@ class TaskInstance(Base, LoggingMixin): self.refresh_from_db() self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context) return + except AirflowFailException as e: + self.refresh_from_db() + self.handle_failure(e, test_mode, context, force_fail=True) + raise except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally @@ -1136,7 +1141,7 @@ class TaskInstance(Base, LoggingMixin): self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE') @provide_session - def handle_failure(self, error, test_mode=None, context=None, session=None): + def handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None): if test_mode is None: test_mode = self.test_mode if context is None: @@ -1157,64 +1162,51 @@ class TaskInstance(Base, LoggingMixin): if context is not None: context['exception'] = error - # Let's go deeper - try: - # Since this function is called only when the TI state is running, - # try_number contains the current try_number (not the next). We - # only mark task instance as FAILED if the next task instance - # try_number exceeds the max_tries. - if self.is_eligible_to_retry(): - self.state = State.UP_FOR_RETRY - self.log.info('Marking task as UP_FOR_RETRY') - if task.email_on_retry and task.email: - self.email_alert(error) + # Set state correctly and figure out how to log it, + # what callback to call if any, and how to decide whether to email + + # Since this function is called only when the TaskInstance state is running, + # try_number contains the current try_number (not the next). We + # only mark task instance as FAILED if the next task instance + # try_number exceeds the max_tries ... or if force_fail is truthy + + if force_fail or not self.is_eligible_to_retry(): + self.state = State.FAILED + if force_fail: + log_message = "Immediate failure requested. Marking task as FAILED." else: - self.state = State.FAILED - if task.retries: - self.log.info( - 'All retries failed; marking task as FAILED.' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'execution_date') and self.execution_date else '', - self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'start_date') and self.start_date else '', - self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'end_date') and self.end_date else '') - else: - self.log.info( - 'Marking task as FAILED.' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'execution_date') and self.execution_date else '', - self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'start_date') and self.start_date else '', - self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'end_date') and self.end_date else '') - if task.email_on_failure and task.email: - self.email_alert(error) - except Exception as e2: - self.log.error('Failed to send email to: %s', task.email) - self.log.exception(e2) + log_message = "Marking task as FAILED." + email_for_state = task.email_on_failure + callback = task.on_failure_callback + else: + self.state = State.UP_FOR_RETRY + log_message = "Marking task as UP_FOR_RETRY." + email_for_state = task.email_on_retry + callback = task.on_retry_callback + + self.log.info( + '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', + log_message, + self.dag_id, + self.task_id, + self._safe_date('execution_date', '%Y%m%dT%H%M%S'), + self._safe_date('start_date', '%Y%m%dT%H%M%S'), + self._safe_date('end_date', '%Y%m%dT%H%M%S') + ) + if email_for_state and task.email: + try: + self.email_alert(error) + except Exception as e2: + self.log.error('Failed to send email to: %s', task.email) + self.log.exception(e2) # Handling callbacks pessimistically - try: - if self.state == State.UP_FOR_RETRY and task.on_retry_callback: - task.on_retry_callback(context) - if self.state == State.FAILED and task.on_failure_callback: - task.on_failure_callback(context) - except Exception as e3: - self.log.error("Failed at executing callback") - self.log.exception(e3) + if callback: + try: + callback(context) + except Exception as e3: + self.log.error("Failed at executing callback") + self.log.exception(e3) if not test_mode: session.merge(self) @@ -1224,6 +1216,12 @@ class TaskInstance(Base, LoggingMixin): """Is task instance is eligible for retry""" return self.task.retries and self.try_number <= self.max_tries + def _safe_date(self, date_attr, fmt): + result = getattr(self, date_attr, None) + if result is not None: + return result.strftime(fmt) + return '' + @provide_session def get_template_context(self, session=None): task = self.task diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst index d7f7a47..1be7ee7 100644 --- a/docs/autoapi_templates/index.rst +++ b/docs/autoapi_templates/index.rst @@ -119,6 +119,17 @@ persisted in the database. airflow/models/index +.. _pythonapi:exceptions: + +Exceptions +---------- + +.. toctree:: + :includehidden: + :glob: + :maxdepth: 1 + + airflow/exceptions/index Core and community package -------------------------- @@ -131,7 +142,7 @@ added only to the contrib package. Secrets Backends ---------------- -Airflow uses relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects. +Airflow relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects. All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. .. toctree:: diff --git a/docs/concepts.rst b/docs/concepts.rst index b365aba..edd6e5d 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1223,6 +1223,51 @@ template string: See `Jinja documentation <https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_ to find all available options. +.. _exceptions: + +Exceptions +========== + +Airflow defines a number of exceptions; most of these are used internally, but a few +are relevant to authors of custom operators or python callables called from ``PythonOperator`` +tasks. Normally any exception raised from an ``execute`` method or python callable will either +cause a task instance to fail if it is not configured to retry or has reached its limit on +retry attempts, or to be marked as "up for retry". A few exceptions can be used when different +behavior is desired: + +* ``AirflowSkipException`` can be raised to set the state of the current task instance to "skipped" +* ``AirflowFailException`` can be raised to set the state of the current task to "failed" regardless + of whether there are any retry attempts remaining. + +This example illustrates some possibilities + +.. code:: python + + from airflow.exceptions import AirflowFailException, AirflowSkipException + + def fetch_data(): + try: + data = get_some_data(get_api_key()) + if not data: + # Set state to skipped and do not retry + # Downstream task behavior will be determined by trigger rules + raise AirflowSkipException("No data available.") + except Unauthorized: + # If we retry, our api key will still be bad, so don't waste time retrying! + # Set state to failed and move on + raise AirflowFailException("Our api key is bad!") + except TransientError: + print("Looks like there was a blip.") + # Raise the exception and let the task retry unless max attempts were reached + raise + handle(data) + + task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, retries=10) + +.. seealso:: + - :ref:`List of Airflow exceptions <pythonapi:exceptions>` + + Packaged DAGs ''''''''''''' While often you will specify DAGs in a single ``.py`` file it might sometimes diff --git a/docs/conf.py b/docs/conf.py index 67010b3..5f2a113 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -198,7 +198,6 @@ exclude_patterns = [ '_api/airflow/dag', '_api/airflow/default_login', '_api/airflow/example_dags', - '_api/airflow/exceptions', '_api/airflow/index.rst', '_api/airflow/jobs', '_api/airflow/lineage', diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 4534a07..0c416a4 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -31,7 +31,7 @@ from sqlalchemy.orm.session import Session from airflow import models, settings from airflow.configuration import conf from airflow.contrib.sensors.python_sensor import PythonSensor -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException from airflow.models import ( DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as TI, TaskReschedule, Variable, ) @@ -1408,6 +1408,22 @@ class TaskInstanceTest(unittest.TestCase): context_arg_2 = mock_on_retry_2.call_args[0][0] assert context_arg_2 and "task_instance" in context_arg_2 + # test the scenario where normally we would retry but have been asked to fail + mock_on_failure_3 = mock.MagicMock() + mock_on_retry_3 = mock.MagicMock() + task3 = DummyOperator(task_id="test_handle_failure_on_force_fail", + on_failure_callback=mock_on_failure_3, + on_retry_callback=mock_on_retry_3, + retries=1, + dag=dag) + ti3 = TI(task=task3, execution_date=start_date) + ti3.state = State.FAILED + ti3.handle_failure("test force_fail handling", force_fail=True) + + context_arg_3 = mock_on_failure_3.call_args[0][0] + assert context_arg_3 and "task_instance" in context_arg_3 + mock_on_retry_3.assert_not_called() + @parameterized.expand( [ ('{{ var.value.a_variable }}', 'a test value'), @@ -1474,6 +1490,46 @@ class TaskInstanceTest(unittest.TestCase): with self.assertRaises(KeyError): task.render_template('{{ var.json.get("missing_variable") }}', context) + def test_does_not_retry_on_airflow_fail_exception(self): + def fail(): + raise AirflowFailException("hopeless") + + dag = models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception') + task = PythonOperator( + task_id='test_raise_airflow_fail_exception', + dag=dag, + python_callable=fail, + owner='airflow', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0), + retries=1 + ) + ti = TI(task=task, execution_date=timezone.utcnow()) + try: + ti.run() + except AirflowFailException: + pass # expected + self.assertEqual(State.FAILED, ti.state) + + def test_retries_on_other_exceptions(self): + def fail(): + raise AirflowException("maybe this will pass?") + + dag = models.DAG(dag_id='test_retries_on_other_exceptions') + task = PythonOperator( + task_id='test_raise_other_exception', + dag=dag, + python_callable=fail, + owner='airflow', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0), + retries=1 + ) + ti = TI(task=task, execution_date=timezone.utcnow()) + try: + ti.run() + except AirflowException: + pass # expected + self.assertEqual(State.UP_FOR_RETRY, ti.state) + @parameterized.expand([ (True, ), (False, )