This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 41c49e23904 [v3-2-test] fix: guard the to.finish stats emission 
(#67955) (#68099)
41c49e23904 is described below

commit 41c49e2390469b6e7320755691aa3786003eed59
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri Jun 5 22:10:04 2026 +0200

    [v3-2-test] fix: guard the to.finish stats emission (#67955) (#68099)
    
    * fix: guard the to.finish stats emission
    
    * fix: guard the to.finish stats emission (fmt)
    
    * fix: prek
    (cherry picked from commit 7e55559406e442dd38f9b17a446f30629127e335)
    
    Co-authored-by: Timothé Barbe 
<[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  | 19 ++++++-----
 .../task_sdk/execution_time/test_task_runner.py    | 37 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index b231e902b3e..3fa63f9afae 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1254,7 +1254,7 @@ def run(
     signal.signal(signal.SIGTERM, _on_term)
 
     msg: ToSupervisor | None = None
-    state: TaskInstanceState
+    state: TaskInstanceState | None = None
     error: BaseException | None = None
 
     stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
@@ -1383,12 +1383,15 @@ def run(
         msg, state = _handle_current_task_failed(ti)
         error = e
     finally:
-        Stats.incr(
-            f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
-            tags=stats_tags,
-        )
-        # Same metric with tagging
-        Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
+        # `state` may still be unset if an exception handler above raised 
before
+        # binding it
+        if state is not None:
+            Stats.incr(
+                f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
+                tags=stats_tags,
+            )
+            # Same metric with tagging
+            Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
 
         if msg:
             # If the supervisor rejects the terminal-state report
@@ -1400,7 +1403,7 @@ def run(
             except Exception:
                 log.exception(
                     "Failed to report terminal task state to supervisor",
-                    state=state.value,
+                    state=state.value if state is not None else None,
                 )
                 # Fail closed for FAILED / UP_FOR_RETRY: when the supervisor
                 # never receives the terminal-state message, exiting 0 would
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index e9dff3cb205..adeba2f49ae 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -4746,6 +4746,43 @@ class TestTriggerDagRunOperator:
         ]
         mock_supervisor_comms.assert_has_calls(expected_calls)
 
+    @time_machine.travel("2025-01-01 00:00:00", tick=False)
+    def test_handle_trigger_dag_run_reraises_original_error(self, 
create_runtime_ti, mock_supervisor_comms):
+        """
+        When an ``except`` handler in ``run()`` raises before binding 
``state``,
+        the original exception must propagate
+        """
+        from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
+
+        class _TriggerSendError(Exception):
+            pass
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id="missing_dag",
+            trigger_run_id="test_run_id",
+        )
+        ti = create_runtime_ti(
+            dag_id="test_handle_trigger_dag_run_reraises_original_error",
+            run_id="test_run",
+            task=task,
+        )
+
+        def _send(msg=None, **kwargs):
+            # Fail only the TriggerDagRun comms (the call that would 404 for a
+            # missing DAG); let every earlier send behave as the default mock.
+            if isinstance(msg, TriggerDagRun):
+                raise _TriggerSendError("simulated 404 for missing target DAG")
+            return mock.DEFAULT
+
+        mock_supervisor_comms.send.side_effect = _send
+
+        log = mock.MagicMock()
+
+        # The original error must surface, not UnboundLocalError on ``state``.
+        with pytest.raises(_TriggerSendError):
+            run(ti, ti.get_template_context(), log)
+
     @pytest.mark.parametrize(
         ("allowed_states", "failed_states", "target_dr_state", 
"expected_task_state"),
         [

Reply via email to