This is an automated email from the ASF dual-hosted git repository.
mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 53dcbce745 Give `on_task_instance_failed` access to the error that
caused the failure (#38155)
53dcbce745 is described below
commit 53dcbce745bbb8ae4e48a55a58daa36d8b5173bd
Author: Raphaƫl Vandon <[email protected]>
AuthorDate: Fri Apr 12 10:54:26 2024 +0200
Give `on_task_instance_failed` access to the error that caused the failure
(#38155)
* give on_task_instance_failed access to the error that caused the failure
* store error in thread-local storage && update sample dag
* add sample code to the listeners doc
* test that the error is accessible on callback
* fix stuff detected by static checks
* add _thread_local_data to TaskInstancePydantic as well
it's a possible type in handle_failure
* mention error recovery mechanism in doc
* replace previous solution with a new parameter
---
airflow/example_dags/plugins/event_listener.py | 7 +++-
airflow/listeners/spec/taskinstance.py | 5 ++-
airflow/models/taskinstance.py | 3 +-
.../administration-and-deployment/listeners.rst | 42 ++++++++++++++++++++--
tests/listeners/class_listener.py | 4 ++-
tests/listeners/file_write_listener.py | 4 ++-
tests/listeners/full_listener.py | 2 +-
tests/models/test_taskinstance.py | 16 +++++++--
8 files changed, 72 insertions(+), 11 deletions(-)
diff --git a/airflow/example_dags/plugins/event_listener.py
b/airflow/example_dags/plugins/event_listener.py
index 43d9df9727..4b9be307c4 100644
--- a/airflow/example_dags/plugins/event_listener.py
+++ b/airflow/example_dags/plugins/event_listener.py
@@ -89,7 +89,9 @@ def on_task_instance_success(previous_state:
TaskInstanceState, task_instance: T
# [START howto_listen_ti_failure_task]
@hookimpl
-def on_task_instance_failed(previous_state: TaskInstanceState, task_instance:
TaskInstance, session):
+def on_task_instance_failed(
+ previous_state: TaskInstanceState, task_instance: TaskInstance, error:
None | str | BaseException, session
+):
"""
This method is called when task state changes to FAILED.
Through callback, parameters like previous_task_state, task_instance
object can be accessed.
@@ -113,6 +115,8 @@ def on_task_instance_failed(previous_state:
TaskInstanceState, task_instance: Ta
print(f"Task start:{start_date} end:{end_date} duration:{duration}")
print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
+ if error:
+ print(f"Failure caused by {error}")
# [END howto_listen_ti_failure_task]
@@ -146,6 +150,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger:
{external_trigger}")
+ print(f"Failed with message: {msg}")
# [END howto_listen_dagrun_failure_task]
diff --git a/airflow/listeners/spec/taskinstance.py
b/airflow/listeners/spec/taskinstance.py
index 03f0a00478..f012de0aac 100644
--- a/airflow/listeners/spec/taskinstance.py
+++ b/airflow/listeners/spec/taskinstance.py
@@ -46,6 +46,9 @@ def on_task_instance_success(
@hookspec
def on_task_instance_failed(
- previous_state: TaskInstanceState | None, task_instance: TaskInstance,
session: Session | None
+ previous_state: TaskInstanceState | None,
+ task_instance: TaskInstance,
+ error: None | str | BaseException,
+ session: Session | None,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c3bf25e343..b07aed936d 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1413,6 +1413,7 @@ class TaskInstance(Base, LoggingMixin):
cascade="all, delete, delete-orphan",
)
note = association_proxy("task_instance_note", "content",
creator=_creator_note)
+
task: Operator | None = None
test_mode: bool = False
is_trigger_log_context: bool = False
@@ -2934,7 +2935,7 @@ class TaskInstance(Base, LoggingMixin):
):
"""Handle Failure for the TaskInstance."""
get_listener_manager().hook.on_task_instance_failed(
- previous_state=TaskInstanceState.RUNNING, task_instance=ti,
session=session
+ previous_state=TaskInstanceState.RUNNING, task_instance=ti,
error=error, session=session
)
if error:
diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst
b/docs/apache-airflow/administration-and-deployment/listeners.rst
index 19f5d27646..655b7de010 100644
--- a/docs/apache-airflow/administration-and-deployment/listeners.rst
+++ b/docs/apache-airflow/administration-and-deployment/listeners.rst
@@ -34,21 +34,57 @@ Lifecycle events allow you to react to start and stop
events for an Airflow ``Jo
DagRun State Change Events
--------------------------
+DagRun state change events occur when a :class:`~airflow.models.dagrun.DagRun`
changes state.
+
- ``on_dag_run_running``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_dagrun_running_task]
+ :end-before: [END howto_listen_dagrun_running_task]
+
- ``on_dag_run_success``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_dagrun_success_task]
+ :end-before: [END howto_listen_dagrun_success_task]
+
- ``on_dag_run_failed``
-DagRun state change events occur when a ``DagRun`` changes state.
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_dagrun_failure_task]
+ :end-before: [END howto_listen_dagrun_failure_task]
+
TaskInstance State Change Events
--------------------------------
+TaskInstance state change events occur when a
:class:`~airflow.models.taskinstance.TaskInstance` changes state.
+You can use these events to react to ``LocalTaskJob`` state changes.
+
- ``on_task_instance_running``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_ti_running_task]
+ :end-before: [END howto_listen_ti_running_task]
+
- ``on_task_instance_success``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_ti_success_task]
+ :end-before: [END howto_listen_ti_success_task]
+
- ``on_task_instance_failed``
-TaskInstance state change events occur when a ``TaskInstance`` changes state.
-You can use these events to react to ``LocalTaskJob`` state changes.
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+ :language: python
+ :start-after: [START howto_listen_ti_failure_task]
+ :end-before: [END howto_listen_ti_failure_task]
+
Dataset Events
--------------
diff --git a/tests/listeners/class_listener.py
b/tests/listeners/class_listener.py
index e36cffa5ac..a719d372bd 100644
--- a/tests/listeners/class_listener.py
+++ b/tests/listeners/class_listener.py
@@ -47,7 +47,9 @@ class ClassBasedListener:
self.state.append(TaskInstanceState.SUCCESS)
@hookimpl
- def on_task_instance_failed(self, previous_state, task_instance, session):
+ def on_task_instance_failed(
+ self, previous_state, task_instance, error: None | str |
BaseException, session
+ ):
self.state.append(TaskInstanceState.FAILED)
diff --git a/tests/listeners/file_write_listener.py
b/tests/listeners/file_write_listener.py
index f134f9f3e6..f550254d9b 100644
--- a/tests/listeners/file_write_listener.py
+++ b/tests/listeners/file_write_listener.py
@@ -42,7 +42,9 @@ class FileWriteListener:
self.write("on_task_instance_success")
@hookimpl
- def on_task_instance_failed(self, previous_state, task_instance, session):
+ def on_task_instance_failed(
+ self, previous_state, task_instance, error: None | str |
BaseException, session
+ ):
self.write("on_task_instance_failed")
@hookimpl
diff --git a/tests/listeners/full_listener.py b/tests/listeners/full_listener.py
index 7fbaefdb0d..229fdab676 100644
--- a/tests/listeners/full_listener.py
+++ b/tests/listeners/full_listener.py
@@ -50,7 +50,7 @@ def on_task_instance_success(previous_state, task_instance,
session):
@hookimpl
-def on_task_instance_failed(previous_state, task_instance, session):
+def on_task_instance_failed(previous_state, task_instance, error: None | str |
BaseException, session):
state.append(TaskInstanceState.FAILED)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 35c4b745b7..4e42882dd7 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2882,6 +2882,11 @@ class TestTaskInstance:
start_date = timezone.datetime(2016, 6, 1)
clear_db_runs()
+ from airflow.listeners.listener import get_listener_manager
+
+ listener_callback_on_error = mock.MagicMock()
+ get_listener_manager().pm.hook.on_task_instance_failed =
listener_callback_on_error
+
mock_on_failure_1 = mock.MagicMock()
mock_on_retry_1 = mock.MagicMock()
dag, task1 = create_dummy_dag(
@@ -2901,11 +2906,18 @@ class TestTaskInstance:
state=None,
session=session,
)
-
ti1 = dr.get_task_instance(task1.task_id, session=session)
ti1.task = task1
+
ti1.state = State.FAILED
- ti1.handle_failure("test failure handling")
+ error_message = "test failure handling"
+ ti1.handle_failure(error_message)
+
+ # check that the listener callback was called, and that it can access
the error
+ listener_callback_on_error.assert_called_once()
+ callback_args = listener_callback_on_error.call_args.kwargs
+ assert "error" in callback_args
+ assert callback_args["error"] == error_message
context_arg_1 = mock_on_failure_1.call_args.args[0]
assert context_arg_1