This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch backport-5af913d-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 687808802511843acb8fee9d3820dc9f2b215583 Author: Amogh Desai <[email protected]> AuthorDate: Wed Feb 18 13:15:00 2026 +0530 Do not dilute task failures with finalization errors during execution (#62070) (cherry picked from commit 5af913ddc3c4f7d8f7e5cae2a804fe85fb348e14) --- .../src/airflow/sdk/execution_time/task_runner.py | 5 ++- .../task_sdk/execution_time/test_task_runner.py | 47 +++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index a9a3b28cfc9..99fa849f4be 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1583,7 +1583,10 @@ def finalize( if getattr(ti.task, "overwrite_rtif_after_execution", False): log.debug("Overwriting Rendered template fields.") if ti.task.template_fields: - SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))) + try: + SUPERVISOR_COMMS.send(SetRenderedFields(rendered_fields=_serialize_rendered_fields(ti.task))) + except Exception: + log.exception("Failed to set rendered fields during finalization", ti=ti, task=ti.task) log.debug("Running finalizers", ti=ti) if state == TaskInstanceState.SUCCESS: diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 7df042ca64f..e14d0828740 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -69,7 +69,7 @@ from airflow.sdk.bases.xcom import BaseXCom from airflow.sdk.definitions._internal.types import NOTSET, SET_DURING_EXECUTION, ArgNotSet from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, Dataset, Model from airflow.sdk.definitions.param import DagParam -from airflow.sdk.exceptions import ErrorType +from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType from airflow.sdk.execution_time import task_runner from airflow.sdk.execution_time.comms import ( AssetEventResult, @@ -2027,6 +2027,51 @@ class TestRuntimeTaskInstance: msg=SetRenderedFields(rendered_fields={"bash_command": rendered_cmd}) ) + def test_overwrite_rtif_after_execution_handles_errors_gracefully( + self, create_runtime_ti, mock_supervisor_comms + ): + """ + Test that errors during SetRenderedFields in finalize() don't mask the original task error. + """ + + class TaskWithRTIF(BaseOperator): + overwrite_rtif_after_execution = True + template_fields = ["command"] + + def __init__(self, command, *args, **kwargs): + self.command = command + super().__init__(*args, **kwargs) + + task = TaskWithRTIF(task_id="test_task", command="test command") + runtime_ti = create_runtime_ti(task=task) + mock_log = mock.MagicMock() + + # mock the SetRenderedFields call to fail with API_SERVER_ERROR + mock_supervisor_comms.send.side_effect = AirflowRuntimeError( + error=ErrorResponse( + error=ErrorType.API_SERVER_ERROR, + detail={ + "status_code": 404, + "message": "Not Found", + "detail": {"detail": "Not Found"}, + }, + ) + ) + + finalize( + runtime_ti, + state=TaskInstanceState.FAILED, + context=runtime_ti.get_template_context(), + log=mock_log, + error=Exception("Task execution failed"), + ) + + mock_log.exception.assert_called_once_with( + "Failed to set rendered fields during finalization", + ti=runtime_ti, + task=task, + ) + @pytest.mark.parametrize( ["task_reschedule_count", "expected_date"], [
