This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41e16d5b0a3 fix(k8s-executor): add latency + status metrics around pod
API calls (#66806)
41e16d5b0a3 is described below
commit 41e16d5b0a331a96fb320ba8d0ba12bf712f4ff4
Author: Stefan Wang <[email protected]>
AuthorDate: Wed May 13 13:16:21 2026 -0700
fix(k8s-executor): add latency + status metrics around pod API calls
(#66806)
Wrap create/delete/patch pod calls with Stats.timer for latency and
Stats.incr tagged by HTTP status for outcome counts. Lets operators
alert on slow control-plane calls and on 429/5xx error surges instead
of inferring them from scheduler log noise.
Closes: #66799
Signed-off-by: 1fanwang <[email protected]>
---
.../executors/kubernetes_executor_utils.py | 58 ++++++++++++++--------
.../executors/test_kubernetes_executor.py | 49 ++++++++++++++++++
.../observability/metrics/metrics_template.yaml | 40 +++++++++++++++
3 files changed, 126 insertions(+), 21 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index ed31b43bf25..af719ada9e4 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -45,7 +45,7 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
create_unique_id,
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator,
workload_to_command_args
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -494,11 +494,18 @@ class AirflowKubernetesScheduler(LoggingMixin):
self.log.debug("Pod Creation Request: \n%s", json_pod)
try:
- resp = self.kube_client.create_namespaced_pod(
- body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
- )
+ with Stats.timer("kubernetes_executor.pod_creation"):
+ resp = self.kube_client.create_namespaced_pod(
+ body=sanitized_pod, namespace=pod.metadata.namespace,
**kwargs
+ )
+ Stats.incr("kubernetes_executor.pod_creation_status",
tags={"status": "200"})
self.log.debug("Pod Creation Response: %s", resp)
+ except ApiException as e:
+ Stats.incr("kubernetes_executor.pod_creation_status",
tags={"status": str(e.status)})
+ self.log.exception("Exception when attempting to create Namespaced
Pod: %s", json_pod)
+ raise
except Exception as e:
+ Stats.incr("kubernetes_executor.pod_creation_status",
tags={"status": "error"})
self.log.exception("Exception when attempting to create Namespaced
Pod: %s", json_pod)
raise e
return resp
@@ -608,13 +615,16 @@ class AirflowKubernetesScheduler(LoggingMixin):
"""Delete Pod from a namespace; does not raise if it does not exist."""
try:
self.log.info("Deleting pod %s in namespace %s", pod_name,
namespace)
- self.kube_client.delete_namespaced_pod(
- pod_name,
- namespace,
-
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
- **self.kube_config.kube_client_request_args,
- )
+ with Stats.timer("kubernetes_executor.pod_deletion"):
+ self.kube_client.delete_namespaced_pod(
+ pod_name,
+ namespace,
+
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
+ **self.kube_config.kube_client_request_args,
+ )
+ Stats.incr("kubernetes_executor.pod_deletion_status",
tags={"status": "200"})
except ApiException as e:
+ Stats.incr("kubernetes_executor.pod_deletion_status",
tags={"status": str(e.status)})
# If the pod is already deleted
if str(e.status) != "404":
raise
@@ -631,24 +641,30 @@ class AirflowKubernetesScheduler(LoggingMixin):
namespace,
)
try:
- self.kube_client.patch_namespaced_pod(
- name=pod_name,
- namespace=namespace,
- body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
- )
- except ApiException:
+ with Stats.timer("kubernetes_executor.pod_patching"):
+ self.kube_client.patch_namespaced_pod(
+ name=pod_name,
+ namespace=namespace,
+ body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
+ )
+ Stats.incr("kubernetes_executor.pod_patching_status",
tags={"status": "200"})
+ except ApiException as e:
+ Stats.incr("kubernetes_executor.pod_patching_status",
tags={"status": str(e.status)})
self.log.warning("Failed to patch pod %s with pod revoked key.",
pod_name, exc_info=True)
def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
"""Add a "done" annotation to ensure we don't continually adopt
pods."""
self.log.debug("Patching pod %s in namespace %s to mark it as done",
pod_name, namespace)
try:
- self.kube_client.patch_namespaced_pod(
- name=pod_name,
- namespace=namespace,
- body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
- )
+ with Stats.timer("kubernetes_executor.pod_patching"):
+ self.kube_client.patch_namespaced_pod(
+ name=pod_name,
+ namespace=namespace,
+ body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY:
"True"}}},
+ )
+ Stats.incr("kubernetes_executor.pod_patching_status",
tags={"status": "200"})
except ApiException as e:
+ Stats.incr("kubernetes_executor.pod_patching_status",
tags={"status": str(e.status)})
self.log.info("Failed to patch pod %s with done annotation.
Reason: %s", pod_name, e)
def sync(self) -> None:
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 9805670ec81..354fdbf05c9 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -244,6 +244,55 @@ class TestAirflowKubernetesScheduler:
finally:
kube_executor.end()
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ def test_delete_pod_emits_metrics_on_success(
+ self, mock_watcher, mock_client, mock_kube_client, mock_stats
+ ):
+ pod_name = "my-pod-1"
+ namespace = "my-namespace-1"
+ mock_kube_client.return_value.delete_namespaced_pod = mock.MagicMock()
+
+ kube_executor = KubernetesExecutor()
+ kube_executor.job_id = 1
+ kube_executor.start()
+ try:
+ kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
+
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status",
tags={"status": "200"})
+ finally:
+ kube_executor.end()
+
+ @pytest.mark.skipif(
+ AirflowKubernetesScheduler is None, reason="kubernetes python package
is not installed"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+ def test_delete_pod_emits_metrics_on_failure(
+ self, mock_watcher, mock_client, mock_kube_client, mock_stats
+ ):
+ pod_name = "my-pod-1"
+ namespace = "my-namespace-2"
+ mock_kube_client.return_value.delete_namespaced_pod.side_effect =
ApiException(status=429)
+
+ kube_executor = KubernetesExecutor()
+ kube_executor.job_id = 1
+ kube_executor.start()
+ try:
+ with pytest.raises(ApiException):
+ kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
+
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status",
tags={"status": "429"})
+ finally:
+ kube_executor.end()
+
def test_running_pod_log_lines(self):
# default behaviour
kube_executor = KubernetesExecutor()
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index fa6111ef54a..876cc3cc895 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -534,6 +534,28 @@ metrics:
legacy_name: "edge_worker.ti.finish.{queue}.{state}.{dag_id}.{task_id}"
name_variables: ["queue", "state", "dag_id", "task_id"]
+ - name: "kubernetes_executor.pod_creation_status"
+ description: "Number of Kubernetes create_namespaced_pod calls from the
Kubernetes Executor,
+ tagged by HTTP response status (``200`` on success, the ``ApiException``
status code on
+ failure, ``error`` for non-API exceptions)."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: ["status"]
+
+ - name: "kubernetes_executor.pod_deletion_status"
+ description: "Number of Kubernetes delete_namespaced_pod calls from the
Kubernetes Executor,
+ tagged by HTTP response status (``200`` on success, the ``ApiException``
status code on failure)."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: ["status"]
+
+ - name: "kubernetes_executor.pod_patching_status"
+ description: "Number of Kubernetes patch_namespaced_pod calls from the
Kubernetes Executor,
+ tagged by HTTP response status (``200`` on success, the ``ApiException``
status code on failure)."
+ type: "counter"
+ legacy_name: "-"
+ name_variables: ["status"]
+
# ==========
# Timers
# ==========
@@ -641,6 +663,24 @@ metrics:
legacy_name: "-"
name_variables: []
+ - name: "kubernetes_executor.pod_creation"
+ description: "Milliseconds taken for a Kubernetes create_namespaced_pod
call from the Kubernetes Executor"
+ type: "timer"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "kubernetes_executor.pod_deletion"
+ description: "Milliseconds taken for a Kubernetes delete_namespaced_pod
call from the Kubernetes Executor"
+ type: "timer"
+ legacy_name: "-"
+ name_variables: []
+
+ - name: "kubernetes_executor.pod_patching"
+ description: "Milliseconds taken for a Kubernetes patch_namespaced_pod
call from the Kubernetes Executor"
+ type: "timer"
+ legacy_name: "-"
+ name_variables: []
+
- name: "batch_executor.adopt_task_instances.duration"
description: "Milliseconds taken to adopt the task instances in the AWS
Batch Executor"
type: "timer"