This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new a173777b9d8 [v3-1-test] Add missing ti.start and ti.finish metrics in 
Airflow 3 (#62019) (#62110)
a173777b9d8 is described below

commit a173777b9d8cdc3cb92822ba4b140833329097f5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 18 14:24:22 2026 +0530

    [v3-1-test] Add missing ti.start and ti.finish metrics in Airflow 3 
(#62019) (#62110)
    
    (cherry picked from commit 8d8048c1699c49fbcf0aa1c8130799d0ff6aa2f7)
    
    Co-authored-by: Amogh Desai <[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  | 12 +++
 .../task_sdk/execution_time/test_task_runner.py    | 89 ++++++++++++++++++++++
 2 files changed, 101 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a9a3b28cfc9..94202e01efb 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1054,6 +1054,11 @@ def run(
     state: TaskInstanceState
     error: BaseException | None = None
 
+    stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+    Stats.incr(f"ti.start.{ti.dag_id}.{ti.task_id}", tags=stats_tags)
+    # Same metric with tagging
+    Stats.incr("ti.start", tags=stats_tags)
+
     try:
         # First, clear the xcom data sent from server
         if ti._ti_context_from_server and (keys_to_delete := 
ti._ti_context_from_server.xcom_keys_to_clear):
@@ -1154,6 +1159,13 @@ def run(
         msg, state = _handle_current_task_failed(ti)
         error = e
     finally:
+        Stats.incr(
+            f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
+            tags=stats_tags,
+        )
+        # Same metric with tagging
+        Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
+
         if msg:
             SUPERVISOR_COMMS.send(msg=msg)
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 7df042ca64f..51d31d2a0fa 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3886,3 +3886,92 @@ class TestTriggerDagRunOperator:
 
         # Also verify it was sent to supervisor
         mock_supervisor_comms.send.assert_any_call(msg)
+
+
+class TestTaskInstanceMetrics:
+    def test_ti_start_metric_emitted(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that ti.start metric is emitted at the beginning of task."""
+        task = PythonOperator(task_id="test", python_callable=lambda: 
"success")
+        ti = create_runtime_ti(task=task)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as 
mock_stats:
+            run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+            # verify ti.start was called in legacy format
+            mock_stats.incr.assert_any_call(
+                f"ti.start.{ti.dag_id}.{ti.task_id}",
+                tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+            )
+            # verify ti.start was called in tagged format
+            mock_stats.incr.assert_any_call(
+                "ti.start",
+                tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+            )
+
+    @pytest.mark.parametrize(
+        ("task_callable", "expected_state"),
+        [
+            pytest.param(lambda: "success", "success", id="success"),
+            pytest.param(lambda: (_ for _ in 
()).throw(AirflowSkipException()), "skipped", id="skipped"),
+            pytest.param(lambda: (_ for _ in 
()).throw(AirflowFailException("fail")), "failed", id="failed"),
+            pytest.param(lambda: 1 / 0, "failed", id="zero_division"),
+        ],
+    )
+    def test_ti_finish_metric_emitted_for_terminal_states(
+        self, task_callable, expected_state, create_runtime_ti, 
mock_supervisor_comms
+    ):
+        """Test that ti.finish metric is emitted for all terminal states."""
+        task = PythonOperator(task_id="test", python_callable=task_callable)
+        ti = create_runtime_ti(task=task)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as 
mock_stats:
+            run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+            # verify ti.finish was called in legacy format
+            mock_stats.incr.assert_any_call(
+                f"ti.finish.{ti.dag_id}.{ti.task_id}.{expected_state}",
+                tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+            )
+            # verify ti.finish was called in tagged format
+            mock_stats.incr.assert_any_call(
+                "ti.finish",
+                tags={"dag_id": ti.dag_id, "task_id": ti.task_id, "state": 
expected_state},
+            )
+
+    def test_operator_successes_metrics_emitted(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that operator_successes and ti_successes metrics are emitted 
on task success."""
+        task = PythonOperator(task_id="test", python_callable=lambda: 
"success")
+        ti = create_runtime_ti(task=task)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as 
mock_stats:
+            run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+            stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+
+            # verify operator_successes in legacy format
+            
mock_stats.incr.assert_any_call("operator_successes_PythonOperator", 
tags=stats_tags)
+            # verify operator_successes in tagged format
+            mock_stats.incr.assert_any_call(
+                "operator_successes",
+                tags={**stats_tags, "operator": "PythonOperator"},
+            )
+            mock_stats.incr.assert_any_call("ti_successes", tags=stats_tags)
+
+    def test_operator_failures_metrics_emitted(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that operator_failures and ti_failures metrics are emitted on 
task failure."""
+        task = PythonOperator(task_id="test", python_callable=lambda: 1 / 0)
+        ti = create_runtime_ti(task=task)
+
+        with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as 
mock_stats:
+            run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+            stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+
+            # verify operator_failures in legacy format
+            
mock_stats.incr.assert_any_call("operator_failures_PythonOperator", 
tags=stats_tags)
+            # verify operator_failures in tagged format
+            mock_stats.incr.assert_any_call(
+                "operator_failures",
+                tags={**stats_tags, "operator": "PythonOperator"},
+            )
+            mock_stats.incr.assert_any_call("ti_failures", tags=stats_tags)

Reply via email to