This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 41c49e23904 [v3-2-test] fix: guard the to.finish stats emission
(#67955) (#68099)
41c49e23904 is described below
commit 41c49e2390469b6e7320755691aa3786003eed59
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Jun 5 22:10:04 2026 +0200
[v3-2-test] fix: guard the to.finish stats emission (#67955) (#68099)
* fix: guard the to.finish stats emission
* fix: guard the to.finish stats emission (fmt)
* fix: prek
(cherry picked from commit 7e55559406e442dd38f9b17a446f30629127e335)
Co-authored-by: Timothé Barbe
<[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 19 ++++++-----
.../task_sdk/execution_time/test_task_runner.py | 37 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index b231e902b3e..3fa63f9afae 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1254,7 +1254,7 @@ def run(
signal.signal(signal.SIGTERM, _on_term)
msg: ToSupervisor | None = None
- state: TaskInstanceState
+ state: TaskInstanceState | None = None
error: BaseException | None = None
stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
@@ -1383,12 +1383,15 @@ def run(
msg, state = _handle_current_task_failed(ti)
error = e
finally:
- Stats.incr(
- f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
- tags=stats_tags,
- )
- # Same metric with tagging
- Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
+ # `state` may still be unset if an exception handler above raised
before
+ # binding it
+ if state is not None:
+ Stats.incr(
+ f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
+ tags=stats_tags,
+ )
+ # Same metric with tagging
+ Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
if msg:
# If the supervisor rejects the terminal-state report
@@ -1400,7 +1403,7 @@ def run(
except Exception:
log.exception(
"Failed to report terminal task state to supervisor",
- state=state.value,
+ state=state.value if state is not None else None,
)
# Fail closed for FAILED / UP_FOR_RETRY: when the supervisor
# never receives the terminal-state message, exiting 0 would
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index e9dff3cb205..adeba2f49ae 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -4746,6 +4746,43 @@ class TestTriggerDagRunOperator:
]
mock_supervisor_comms.assert_has_calls(expected_calls)
+ @time_machine.travel("2025-01-01 00:00:00", tick=False)
+ def test_handle_trigger_dag_run_reraises_original_error(self,
create_runtime_ti, mock_supervisor_comms):
+ """
+ When an ``except`` handler in ``run()`` raises before binding
``state``,
+ the original exception must propagate
+ """
+ from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+
+ class _TriggerSendError(Exception):
+ pass
+
+ task = TriggerDagRunOperator(
+ task_id="test_task",
+ trigger_dag_id="missing_dag",
+ trigger_run_id="test_run_id",
+ )
+ ti = create_runtime_ti(
+ dag_id="test_handle_trigger_dag_run_reraises_original_error",
+ run_id="test_run",
+ task=task,
+ )
+
+ def _send(msg=None, **kwargs):
+ # Fail only the TriggerDagRun comms (the call that would 404 for a
+ # missing DAG); let every earlier send behave as the default mock.
+ if isinstance(msg, TriggerDagRun):
+ raise _TriggerSendError("simulated 404 for missing target DAG")
+ return mock.DEFAULT
+
+ mock_supervisor_comms.send.side_effect = _send
+
+ log = mock.MagicMock()
+
+ # The original error must surface, not UnboundLocalError on ``state``.
+ with pytest.raises(_TriggerSendError):
+ run(ti, ti.get_template_context(), log)
+
@pytest.mark.parametrize(
("allowed_states", "failed_states", "target_dr_state",
"expected_task_state"),
[