This is an automated email from the ASF dual-hosted git repository.

mobuchowski pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 53dcbce745 Give `on_task_instance_failed` access to the error that 
caused the failure (#38155)
53dcbce745 is described below

commit 53dcbce745bbb8ae4e48a55a58daa36d8b5173bd
Author: RaphaĆ«l Vandon <[email protected]>
AuthorDate: Fri Apr 12 10:54:26 2024 +0200

    Give `on_task_instance_failed` access to the error that caused the failure 
(#38155)
    
    * give on_task_instance_failed access to the error that caused the failure
    
    * store error in thread-local storage && update sample dag
    
    * add sample code to the listeners doc
    
    * test that the error is accessible on callback
    
    * fix stuff detected by static checks
    
    * add _thread_local_data to TaskInstancePydantic as well
    
    it's a possible type in handle_failure
    
    * mention error recovery mechanism in doc
    
    * replace previous solution with a new parameter
---
 airflow/example_dags/plugins/event_listener.py     |  7 +++-
 airflow/listeners/spec/taskinstance.py             |  5 ++-
 airflow/models/taskinstance.py                     |  3 +-
 .../administration-and-deployment/listeners.rst    | 42 ++++++++++++++++++++--
 tests/listeners/class_listener.py                  |  4 ++-
 tests/listeners/file_write_listener.py             |  4 ++-
 tests/listeners/full_listener.py                   |  2 +-
 tests/models/test_taskinstance.py                  | 16 +++++++--
 8 files changed, 72 insertions(+), 11 deletions(-)

diff --git a/airflow/example_dags/plugins/event_listener.py 
b/airflow/example_dags/plugins/event_listener.py
index 43d9df9727..4b9be307c4 100644
--- a/airflow/example_dags/plugins/event_listener.py
+++ b/airflow/example_dags/plugins/event_listener.py
@@ -89,7 +89,9 @@ def on_task_instance_success(previous_state: 
TaskInstanceState, task_instance: T
 
 # [START howto_listen_ti_failure_task]
 @hookimpl
-def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: 
TaskInstance, session):
+def on_task_instance_failed(
+    previous_state: TaskInstanceState, task_instance: TaskInstance, error: 
None | str | BaseException, session
+):
     """
     This method is called when task state changes to FAILED.
     Through callback, parameters like previous_task_state, task_instance 
object can be accessed.
@@ -113,6 +115,8 @@ def on_task_instance_failed(previous_state: 
TaskInstanceState, task_instance: Ta
 
     print(f"Task start:{start_date} end:{end_date} duration:{duration}")
     print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
+    if error:
+        print(f"Failure caused by {error}")
 
 
 # [END howto_listen_ti_failure_task]
@@ -146,6 +150,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str):
     external_trigger = dag_run.external_trigger
 
     print(f"Dag information:{dag_id} Run id: {run_id} external trigger: 
{external_trigger}")
+    print(f"Failed with message: {msg}")
 
 
 # [END howto_listen_dagrun_failure_task]
diff --git a/airflow/listeners/spec/taskinstance.py 
b/airflow/listeners/spec/taskinstance.py
index 03f0a00478..f012de0aac 100644
--- a/airflow/listeners/spec/taskinstance.py
+++ b/airflow/listeners/spec/taskinstance.py
@@ -46,6 +46,9 @@ def on_task_instance_success(
 
 @hookspec
 def on_task_instance_failed(
-    previous_state: TaskInstanceState | None, task_instance: TaskInstance, 
session: Session | None
+    previous_state: TaskInstanceState | None,
+    task_instance: TaskInstance,
+    error: None | str | BaseException,
+    session: Session | None,
 ):
     """Execute when task state changes to FAIL. previous_state can be None."""
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c3bf25e343..b07aed936d 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1413,6 +1413,7 @@ class TaskInstance(Base, LoggingMixin):
         cascade="all, delete, delete-orphan",
     )
     note = association_proxy("task_instance_note", "content", 
creator=_creator_note)
+
     task: Operator | None = None
     test_mode: bool = False
     is_trigger_log_context: bool = False
@@ -2934,7 +2935,7 @@ class TaskInstance(Base, LoggingMixin):
     ):
         """Handle Failure for the TaskInstance."""
         get_listener_manager().hook.on_task_instance_failed(
-            previous_state=TaskInstanceState.RUNNING, task_instance=ti, 
session=session
+            previous_state=TaskInstanceState.RUNNING, task_instance=ti, 
error=error, session=session
         )
 
         if error:
diff --git a/docs/apache-airflow/administration-and-deployment/listeners.rst 
b/docs/apache-airflow/administration-and-deployment/listeners.rst
index 19f5d27646..655b7de010 100644
--- a/docs/apache-airflow/administration-and-deployment/listeners.rst
+++ b/docs/apache-airflow/administration-and-deployment/listeners.rst
@@ -34,21 +34,57 @@ Lifecycle events allow you to react to start and stop 
events for an Airflow ``Jo
 DagRun State Change Events
 --------------------------
 
+DagRun state change events occur when a :class:`~airflow.models.dagrun.DagRun` 
changes state.
+
 - ``on_dag_run_running``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_dagrun_running_task]
+    :end-before: [END howto_listen_dagrun_running_task]
+
 - ``on_dag_run_success``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_dagrun_success_task]
+    :end-before: [END howto_listen_dagrun_success_task]
+
 - ``on_dag_run_failed``
 
-DagRun state change events occur when a ``DagRun`` changes state.
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_dagrun_failure_task]
+    :end-before: [END howto_listen_dagrun_failure_task]
+
 
 TaskInstance State Change Events
 --------------------------------
 
+TaskInstance state change events occur when a 
:class:`~airflow.models.taskinstance.TaskInstance` changes state.
+You can use these events to react to ``LocalTaskJob`` state changes.
+
 - ``on_task_instance_running``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_ti_running_task]
+    :end-before: [END howto_listen_ti_running_task]
+
 - ``on_task_instance_success``
+
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_ti_success_task]
+    :end-before: [END howto_listen_ti_success_task]
+
 - ``on_task_instance_failed``
 
-TaskInstance state change events occur when a ``TaskInstance`` changes state.
-You can use these events to react to ``LocalTaskJob`` state changes.
+.. exampleinclude:: /../../airflow/example_dags/plugins/event_listener.py
+    :language: python
+    :start-after: [START howto_listen_ti_failure_task]
+    :end-before: [END howto_listen_ti_failure_task]
+
 
 Dataset Events
 --------------
diff --git a/tests/listeners/class_listener.py 
b/tests/listeners/class_listener.py
index e36cffa5ac..a719d372bd 100644
--- a/tests/listeners/class_listener.py
+++ b/tests/listeners/class_listener.py
@@ -47,7 +47,9 @@ class ClassBasedListener:
         self.state.append(TaskInstanceState.SUCCESS)
 
     @hookimpl
-    def on_task_instance_failed(self, previous_state, task_instance, session):
+    def on_task_instance_failed(
+        self, previous_state, task_instance, error: None | str | 
BaseException, session
+    ):
         self.state.append(TaskInstanceState.FAILED)
 
 
diff --git a/tests/listeners/file_write_listener.py 
b/tests/listeners/file_write_listener.py
index f134f9f3e6..f550254d9b 100644
--- a/tests/listeners/file_write_listener.py
+++ b/tests/listeners/file_write_listener.py
@@ -42,7 +42,9 @@ class FileWriteListener:
         self.write("on_task_instance_success")
 
     @hookimpl
-    def on_task_instance_failed(self, previous_state, task_instance, session):
+    def on_task_instance_failed(
+        self, previous_state, task_instance, error: None | str | 
BaseException, session
+    ):
         self.write("on_task_instance_failed")
 
     @hookimpl
diff --git a/tests/listeners/full_listener.py b/tests/listeners/full_listener.py
index 7fbaefdb0d..229fdab676 100644
--- a/tests/listeners/full_listener.py
+++ b/tests/listeners/full_listener.py
@@ -50,7 +50,7 @@ def on_task_instance_success(previous_state, task_instance, 
session):
 
 
 @hookimpl
-def on_task_instance_failed(previous_state, task_instance, session):
+def on_task_instance_failed(previous_state, task_instance, error: None | str | 
BaseException, session):
     state.append(TaskInstanceState.FAILED)
 
 
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 35c4b745b7..4e42882dd7 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2882,6 +2882,11 @@ class TestTaskInstance:
         start_date = timezone.datetime(2016, 6, 1)
         clear_db_runs()
 
+        from airflow.listeners.listener import get_listener_manager
+
+        listener_callback_on_error = mock.MagicMock()
+        get_listener_manager().pm.hook.on_task_instance_failed = 
listener_callback_on_error
+
         mock_on_failure_1 = mock.MagicMock()
         mock_on_retry_1 = mock.MagicMock()
         dag, task1 = create_dummy_dag(
@@ -2901,11 +2906,18 @@ class TestTaskInstance:
             state=None,
             session=session,
         )
-
         ti1 = dr.get_task_instance(task1.task_id, session=session)
         ti1.task = task1
+
         ti1.state = State.FAILED
-        ti1.handle_failure("test failure handling")
+        error_message = "test failure handling"
+        ti1.handle_failure(error_message)
+
+        # check that the listener callback was called, and that it can access 
the error
+        listener_callback_on_error.assert_called_once()
+        callback_args = listener_callback_on_error.call_args.kwargs
+        assert "error" in callback_args
+        assert callback_args["error"] == error_message
 
         context_arg_1 = mock_on_failure_1.call_args.args[0]
         assert context_arg_1

Reply via email to