This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5af913ddc3c Do not dilute task failures with finalization errors
during execution (#62070)
5af913ddc3c is described below
commit 5af913ddc3c4f7d8f7e5cae2a804fe85fb348e14
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Feb 18 13:15:00 2026 +0530
Do not dilute task failures with finalization errors during execution
(#62070)
---
.../src/airflow/sdk/execution_time/task_runner.py | 5 ++-
.../task_sdk/execution_time/test_task_runner.py | 46 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a569732c5f7..dbf8e7f12ec 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1713,7 +1713,10 @@ def finalize(
if getattr(ti.task, "overwrite_rtif_after_execution", False):
log.debug("Overwriting Rendered template fields.")
if ti.task.template_fields:
-
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
+ try:
+
SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task)))
+ except Exception:
+ log.exception("Failed to set rendered fields during
finalization", ti=ti, task=ti.task)
log.debug("Running finalizers", ti=ti)
if state == TaskInstanceState.SUCCESS:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 571caedc87f..3de79732ff5 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -65,6 +65,7 @@ from airflow.sdk.exceptions import (
AirflowException,
AirflowFailException,
AirflowRescheduleException,
+ AirflowRuntimeError,
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
@@ -2179,6 +2180,51 @@ class TestRuntimeTaskInstance:
msg=SetRenderedFields(rendered_fields={"bash_command":
rendered_cmd})
)
+ def test_overwrite_rtif_after_execution_handles_errors_gracefully(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """
+ Test that errors during SetRenderedFields in finalize() don't mask the
original task error.
+ """
+
+ class TaskWithRTIF(BaseOperator):
+ overwrite_rtif_after_execution = True
+ template_fields = ["command"]
+
+ def __init__(self, command, *args, **kwargs):
+ self.command = command
+ super().__init__(*args, **kwargs)
+
+ task = TaskWithRTIF(task_id="test_task", command="test command")
+ runtime_ti = create_runtime_ti(task=task)
+ mock_log = mock.MagicMock()
+
+ # mock the SetRenderedFields call to fail with API_SERVER_ERROR
+ mock_supervisor_comms.send.side_effect = AirflowRuntimeError(
+ error=ErrorResponse(
+ error=ErrorType.API_SERVER_ERROR,
+ detail={
+ "status_code": 404,
+ "message": "Not Found",
+ "detail": {"detail": "Not Found"},
+ },
+ )
+ )
+
+ finalize(
+ runtime_ti,
+ state=TaskInstanceState.FAILED,
+ context=runtime_ti.get_template_context(),
+ log=mock_log,
+ error=Exception("Task execution failed"),
+ )
+
+ mock_log.exception.assert_called_once_with(
+ "Failed to set rendered fields during finalization",
+ ti=runtime_ti,
+ task=task,
+ )
+
@pytest.mark.parametrize(
("task_reschedule_count", "expected_date"),
[