This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new a173777b9d8 [v3-1-test] Add missing ti.start and ti.finish metrics in
Airflow 3 (#62019) (#62110)
a173777b9d8 is described below
commit a173777b9d8cdc3cb92822ba4b140833329097f5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 18 14:24:22 2026 +0530
[v3-1-test] Add missing ti.start and ti.finish metrics in Airflow 3
(#62019) (#62110)
(cherry picked from commit 8d8048c1699c49fbcf0aa1c8130799d0ff6aa2f7)
Co-authored-by: Amogh Desai <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 12 +++
.../task_sdk/execution_time/test_task_runner.py | 89 ++++++++++++++++++++++
2 files changed, 101 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a9a3b28cfc9..94202e01efb 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1054,6 +1054,11 @@ def run(
state: TaskInstanceState
error: BaseException | None = None
+ stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+ Stats.incr(f"ti.start.{ti.dag_id}.{ti.task_id}", tags=stats_tags)
+ # Same metric with tagging
+ Stats.incr("ti.start", tags=stats_tags)
+
try:
# First, clear the xcom data sent from server
if ti._ti_context_from_server and (keys_to_delete :=
ti._ti_context_from_server.xcom_keys_to_clear):
@@ -1154,6 +1159,13 @@ def run(
msg, state = _handle_current_task_failed(ti)
error = e
finally:
+ Stats.incr(
+ f"ti.finish.{ti.dag_id}.{ti.task_id}.{state.value}",
+ tags=stats_tags,
+ )
+ # Same metric with tagging
+ Stats.incr("ti.finish", tags={**stats_tags, "state": state.value})
+
if msg:
SUPERVISOR_COMMS.send(msg=msg)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 7df042ca64f..51d31d2a0fa 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3886,3 +3886,92 @@ class TestTriggerDagRunOperator:
# Also verify it was sent to supervisor
mock_supervisor_comms.send.assert_any_call(msg)
+
+
+class TestTaskInstanceMetrics:
+ def test_ti_start_metric_emitted(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that ti.start metric is emitted at the beginning of task."""
+ task = PythonOperator(task_id="test", python_callable=lambda:
"success")
+ ti = create_runtime_ti(task=task)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as
mock_stats:
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ # verify ti.start was called in legacy format
+ mock_stats.incr.assert_any_call(
+ f"ti.start.{ti.dag_id}.{ti.task_id}",
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ )
+ # verify ti.start was called in tagged format
+ mock_stats.incr.assert_any_call(
+ "ti.start",
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ )
+
+ @pytest.mark.parametrize(
+ ("task_callable", "expected_state"),
+ [
+ pytest.param(lambda: "success", "success", id="success"),
+ pytest.param(lambda: (_ for _ in
()).throw(AirflowSkipException()), "skipped", id="skipped"),
+ pytest.param(lambda: (_ for _ in
()).throw(AirflowFailException("fail")), "failed", id="failed"),
+ pytest.param(lambda: 1 / 0, "failed", id="zero_division"),
+ ],
+ )
+ def test_ti_finish_metric_emitted_for_terminal_states(
+ self, task_callable, expected_state, create_runtime_ti,
mock_supervisor_comms
+ ):
+ """Test that ti.finish metric is emitted for all terminal states."""
+ task = PythonOperator(task_id="test", python_callable=task_callable)
+ ti = create_runtime_ti(task=task)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as
mock_stats:
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ # verify ti.finish was called in legacy format
+ mock_stats.incr.assert_any_call(
+ f"ti.finish.{ti.dag_id}.{ti.task_id}.{expected_state}",
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
+ )
+ # verify ti.finish was called in tagged format
+ mock_stats.incr.assert_any_call(
+ "ti.finish",
+ tags={"dag_id": ti.dag_id, "task_id": ti.task_id, "state":
expected_state},
+ )
+
+ def test_operator_successes_metrics_emitted(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that operator_successes and ti_successes metrics are emitted
on task success."""
+ task = PythonOperator(task_id="test", python_callable=lambda:
"success")
+ ti = create_runtime_ti(task=task)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as
mock_stats:
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+
+ # verify operator_successes in legacy format
+
mock_stats.incr.assert_any_call("operator_successes_PythonOperator",
tags=stats_tags)
+ # verify operator_successes in tagged format
+ mock_stats.incr.assert_any_call(
+ "operator_successes",
+ tags={**stats_tags, "operator": "PythonOperator"},
+ )
+ mock_stats.incr.assert_any_call("ti_successes", tags=stats_tags)
+
+ def test_operator_failures_metrics_emitted(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that operator_failures and ti_failures metrics are emitted on
task failure."""
+ task = PythonOperator(task_id="test", python_callable=lambda: 1 / 0)
+ ti = create_runtime_ti(task=task)
+
+ with mock.patch("airflow.sdk.execution_time.task_runner.Stats") as
mock_stats:
+ run(ti, context=ti.get_template_context(), log=mock.MagicMock())
+
+ stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id}
+
+ # verify operator_failures in legacy format
+
mock_stats.incr.assert_any_call("operator_failures_PythonOperator",
tags=stats_tags)
+ # verify operator_failures in tagged format
+ mock_stats.incr.assert_any_call(
+ "operator_failures",
+ tags={**stats_tags, "operator": "PythonOperator"},
+ )
+ mock_stats.incr.assert_any_call("ti_failures", tags=stats_tags)