This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new daa8b8344f2 [v3-0-test] Add exception to context for task callbacks 
(#52055) (#52066)
daa8b8344f2 is described below

commit daa8b8344f200e314c12d4239e33ff4b9cf75a27
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jun 23 16:59:12 2025 +0530

    [v3-0-test] Add exception to context for task callbacks (#52055) (#52066)
    
    (cherry picked from commit 8d48d53df5736c376abdcdcb8378a2f08d92905b)
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 task-sdk/src/airflow/sdk/execution_time/task_runner.py     |  3 ++-
 task-sdk/tests/task_sdk/execution_time/test_task_runner.py | 11 +++++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 33583ffe9c7..a515c1a7212 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1292,7 +1292,8 @@ def main():
             bundle_name=ti.bundle_instance.name,
             bundle_version=ti.bundle_instance.version,
         ):
-            state, msg, error = run(ti, context, log)
+            state, _, error = run(ti, context, log)
+            context["exception"] = error
             finalize(ti, state, context, log, error)
     except KeyboardInterrupt:
         log = structlog.get_logger(logger_name="task")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 09d4d9709e7..4b47fb57aa5 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2277,6 +2277,9 @@ class TestTaskRunnerCallsListeners:
 
 @pytest.mark.usefixtures("mock_supervisor_comms")
 class TestTaskRunnerCallsCallbacks:
+    class _Failure(Exception):
+        """Exception raised in a failed execution and received by the failure 
callback."""
+
     def _execute_success(self, context):
         self.results.append("execute success")
 
@@ -2288,7 +2291,7 @@ class TestTaskRunnerCallsCallbacks:
 
     def _execute_failure(self, context):
         self.results.append("execute failure")
-        raise Exception("sorry!")
+        raise self._Failure("sorry!")
 
     @pytest.mark.parametrize(
         "execute_impl, should_retry, expected_state, expected_results",
@@ -2336,6 +2339,10 @@ class TestTaskRunnerCallsCallbacks:
         def custom_callback(context, *, kind):
             collected_results.append(f"on-{kind} callback")
 
+        def failure_callback(context):
+            custom_callback(context, kind="failure")
+            assert isinstance(context["exception"], self._Failure)
+
         class CustomOperator(BaseOperator):
             results = collected_results
             execute = execute_impl
@@ -2345,7 +2352,7 @@ class TestTaskRunnerCallsCallbacks:
             on_execute_callback=functools.partial(custom_callback, 
kind="execute"),
             on_skipped_callback=functools.partial(custom_callback, 
kind="skipped"),
             on_success_callback=functools.partial(custom_callback, 
kind="success"),
-            on_failure_callback=functools.partial(custom_callback, 
kind="failure"),
+            on_failure_callback=failure_callback,
             on_retry_callback=functools.partial(custom_callback, kind="retry"),
         )
         runtime_ti = create_runtime_ti(dag_id="dag", task=task, 
should_retry=should_retry)

Reply via email to