This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new cbb9e5108fd [v3-2-test] Improve Post-Task logs to show exception in 
failure (#66735) (#66920)
cbb9e5108fd is described below

commit cbb9e5108fdd89a36f0774f81d0256b1e2ed9ed5
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu May 14 14:03:27 2026 +0200

    [v3-2-test] Improve Post-Task logs to show exception in failure (#66735) 
(#66920)
    
    (cherry picked from commit a641d83da9c0858e19e076fa9690a0485a3d5233)
---
 task-sdk/src/airflow/sdk/execution_time/task_runner.py | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 0f89596cda2..f41d4709545 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1271,6 +1271,7 @@ def run(
 
             try:
                 result = _execute_task(context=context, ti=ti, log=log)
+                log.info("::group::Post Execute")
             except Exception:
                 import jinja2
 
@@ -1290,22 +1291,24 @@ def run(
                 # Send update only if value changed (e.g., user set context 
variables during execution)
                 if ti.rendered_map_index and ti.rendered_map_index != 
previous_rendered_map_index:
                     
SUPERVISOR_COMMS.send(msg=SetRenderedMapIndex(rendered_map_index=ti.rendered_map_index))
-            finally:
-                log.info("::group::Post Execute")
 
         _push_xcom_if_needed(result, ti, log)
 
         msg, state = _handle_current_task_success(context, ti)
     except DownstreamTasksSkipped as skip:
+        log.info("::group::Post Execute")
         log.info("Skipping downstream tasks.")
         tasks_to_skip = skip.tasks if isinstance(skip.tasks, list) else 
[skip.tasks]
         SUPERVISOR_COMMS.send(msg=SkipDownstreamTasks(tasks=tasks_to_skip))
         msg, state = _handle_current_task_success(context, ti)
     except DagRunTriggerException as drte:
+        log.info("::group::Post Execute")
         msg, state = _handle_trigger_dag_run(drte, context, ti, log)
     except TaskDeferred as defer:
+        log.info("::group::Post Execute")
         msg, state = _defer_task(defer, ti, log)
     except AirflowSkipException as e:
+        log.info("::group::Post Execute")
         if e.args:
             log.info("Skipping task.", reason=e.args[0])
         msg = TaskState(
@@ -1315,6 +1318,7 @@ def run(
         )
         state = TaskInstanceState.SKIPPED
     except AirflowRescheduleException as reschedule:
+        log.info("::group::Post Execute")
         log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
         msg = RescheduleTask(
             reschedule_date=reschedule.reschedule_date, 
end_date=datetime.now(tz=timezone.utc)
@@ -1324,6 +1328,7 @@ def run(
         # If AirflowFailException is raised, task should not retry.
         # If a sensor in reschedule mode reaches timeout, task should not 
retry.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         ti.end_date = datetime.now(tz=timezone.utc)
         msg = TaskState(
             state=TaskInstanceState.FAILED,
@@ -1335,6 +1340,7 @@ def run(
     except (AirflowTaskTimeout, AirflowException, AirflowRuntimeError) as e:
         # We should allow retries if the task has defined it.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         msg, state = _handle_current_task_failed(ti)
         error = e
     except AirflowTaskTerminated as e:
@@ -1342,6 +1348,7 @@ def run(
         # updated already be another UI API. So, these exceptions should 
ideally never be thrown.
         # If these are thrown, we should mark the TI state as failed.
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         ti.end_date = datetime.now(tz=timezone.utc)
         msg = TaskState(
             state=TaskInstanceState.FAILED,
@@ -1353,10 +1360,12 @@ def run(
     except SystemExit as e:
         # SystemExit needs to be retried if they are eligible.
         log.error("Task exited", exit_code=e.code)
+        log.info("::group::Post Execute")
         msg, state = _handle_current_task_failed(ti)
         error = e
     except BaseException as e:
         log.exception("Task failed with exception")
+        log.info("::group::Post Execute")
         msg, state = _handle_current_task_failed(ti)
         error = e
     finally:

Reply via email to