This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new faf9f73 Handle and log exceptions raised during task callback (#17347)
faf9f73 is described below
commit faf9f731fa8810e05f868ffec989ea042381ada4
Author: Sam Wheating <[email protected]>
AuthorDate: Fri Aug 6 03:35:13 2021 -0700
Handle and log exceptions raised during task callback (#17347)
Add missing exception handling in success/retry/failure callbacks
---
airflow/models/taskinstance.py | 15 ++++++++++++---
tests/models/test_taskinstance.py | 39 +++++++++++++++++++++++++++++++++++++++
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 1b76145..e39f54a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1360,18 +1360,27 @@ class TaskInstance(Base, LoggingMixin):
if task.on_failure_callback is not None:
context = self.get_template_context()
context["exception"] = error
- task.on_failure_callback(context)
+ try:
+ task.on_failure_callback(context)
+ except Exception:
+ self.log.exception("Error when executing
on_failure_callback")
elif self.state == State.SUCCESS:
task = self.task
if task.on_success_callback is not None:
context = self.get_template_context()
- task.on_success_callback(context)
+ try:
+ task.on_success_callback(context)
+ except Exception:
+ self.log.exception("Error when executing
on_success_callback")
elif self.state == State.UP_FOR_RETRY:
task = self.task
if task.on_retry_callback is not None:
context = self.get_template_context()
context["exception"] = error
- task.on_retry_callback(context)
+ try:
+ task.on_retry_callback(context)
+ except Exception:
+ self.log.exception("Error when executing
on_retry_callback")
@provide_session
def run(
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 7d64c76..2e5eb8a 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1743,6 +1743,45 @@ class TestTaskInstance(unittest.TestCase):
ti.refresh_from_db()
assert ti.state == State.SUCCESS
+ @parameterized.expand(
+ [
+ (State.SUCCESS, "Error when executing on_success_callback"),
+ (State.UP_FOR_RETRY, "Error when executing on_retry_callback"),
+ (State.FAILED, "Error when executing on_failure_callback"),
+ ]
+ )
+ def test_finished_callbacks_handle_and_log_exception(self, finished_state,
expected_message):
+ called = completed = False
+
+ def on_finish_callable(context):
+ nonlocal called, completed
+ called = True
+ raise KeyError
+ completed = True
+
+ dag = DAG(
+ 'test_success_callback_handles_exception',
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ )
+ task = DummyOperator(
+ task_id='op',
+ email='[email protected]',
+ on_success_callback=on_finish_callable,
+ on_retry_callback=on_finish_callable,
+ on_failure_callback=on_finish_callable,
+ dag=dag,
+ )
+
+ ti = TI(task=task, execution_date=datetime.datetime.now())
+ ti._log = mock.Mock()
+ ti.state = finished_state
+ ti._run_finished_callback()
+
+ assert called
+ assert not completed
+ ti.log.exception.assert_called_once_with(expected_message)
+
def test_handle_failure(self):
start_date = timezone.datetime(2016, 6, 1)
dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None,
start_date=start_date)