This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a641d83da9c Improve Post-Task logs to show exception in failure 
(#66735)
a641d83da9c is described below

commit a641d83da9c0858e19e076fa9690a0485a3d5233
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu May 14 10:31:05 2026 +0200

    Improve Post-Task logs to show exception in failure (#66735)
---
 task-sdk/src/airflow/sdk/execution_time/task_runner.py | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 7c318fc499e..2cb336c8ea7 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1306,6 +1306,7 @@ def run(
 
             try:
                 result = _execute_task(context=context, ti=ti, log=log)
+                log.info("::group::Post Execute")
             except Exception:
                 import jinja2
 
@@ -1325,22 +1326,24 @@ def run(
                 # Send update only if value changed (e.g., user set context 
variables during execution)
                 if ti.rendered_map_index and ti.rendered_map_index != 
previous_rendered_map_index:
                     
SUPERVISOR_COMMS.send(msg=SetRenderedMapIndex(rendered_map_index=ti.rendered_map_index))
-            finally:
-                log.info("::group::Post Execute")
 
         _push_xcom_if_needed(result, ti, log)
 
         msg, state = _handle_current_task_success(context, ti)
     except DownstreamTasksSkipped as skip:
+        log.info("::group::Post Execute")
         log.info("Skipping downstream tasks.")
         tasks_to_skip = skip.tasks if isinstance(skip.tasks, list) else 
[skip.tasks]
         SUPERVISOR_COMMS.send(msg=SkipDownstreamTasks(tasks=tasks_to_skip))
         msg, state = _handle_current_task_success(context, ti)
     except DagRunTriggerException as drte:
+        log.info("::group::Post Execute")
         msg, state = _handle_trigger_dag_run(drte, context, ti, log)
     except TaskDeferred as defer:
+        log.info("::group::Post Execute")
         msg, state = _defer_task(defer, ti, log)
     except AirflowSkipException as e:
+        log.info("::group::Post Execute")
         if e.args:
             log.info("Skipping task.", reason=e.args[0])
         msg = TaskState(
@@ -1350,6 +1353,7 @@ def run(
         )
         state = TaskInstanceState.SKIPPED
     except AirflowRescheduleException as reschedule:
+        log.info("::group::Post Execute")
         log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
         msg = RescheduleTask(
             reschedule_date=reschedule.reschedule_date, 
end_date=datetime.now(tz=timezone.utc)
@@ -1359,6 +1363,7 @@ def run(
         # If AirflowFailException is raised, task should not retry.
         # If a sensor in reschedule mode reaches timeout, task should not 
retry.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         ti.end_date = datetime.now(tz=timezone.utc)
         msg = TaskState(
             state=TaskInstanceState.FAILED,
@@ -1370,6 +1375,7 @@ def run(
     except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e:
         # We should allow retries if the task has defined it.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         msg, state = _apply_retry_policy_or_default(ti, e, log, context)
         error = e
     except AirflowTaskTerminated as e:
@@ -1377,6 +1383,7 @@ def run(
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         ti.end_date = datetime.now(tz=timezone.utc)
         msg = TaskState(
             state=TaskInstanceState.FAILED,
@@ -1388,10 +1395,12 @@ def run(
     except SystemExit as e:
         # SystemExit needs to be retried if they are eligible.
         log.error("Task exited", exit_code=e.code)
+        log.info("::group::Post Execute")
         msg, state = _apply_retry_policy_or_default(ti, e, log, context)
         error = e
     except BaseException as e:
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         msg, state = _apply_retry_policy_or_default(ti, e, log, context)
         error = e
     finally:

Reply via email to