This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e16d5b0a3 fix(k8s-executor): add latency + status metrics around pod 
API calls (#66806)
41e16d5b0a3 is described below

commit 41e16d5b0a331a96fb320ba8d0ba12bf712f4ff4
Author: Stefan Wang <[email protected]>
AuthorDate: Wed May 13 13:16:21 2026 -0700

    fix(k8s-executor): add latency + status metrics around pod API calls 
(#66806)
    
    Wrap create/delete/patch pod calls with Stats.timer for latency and
    Stats.incr tagged by HTTP status for outcome counts. Lets operators
    alert on slow control-plane calls and on 429/5xx error surges instead
    of inferring them from scheduler log noise.
    
    Closes: #66799
    
    Signed-off-by: 1fanwang <[email protected]>
---
 .../executors/kubernetes_executor_utils.py         | 58 ++++++++++++++--------
 .../executors/test_kubernetes_executor.py          | 49 ++++++++++++++++++
 .../observability/metrics/metrics_template.yaml    | 40 +++++++++++++++
 3 files changed, 126 insertions(+), 21 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index ed31b43bf25..af719ada9e4 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -45,7 +45,7 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
     create_unique_id,
 )
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, 
workload_to_command_args
-from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.sdk import AirflowException, Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import TaskInstanceState
 
@@ -494,11 +494,18 @@ class AirflowKubernetesScheduler(LoggingMixin):
 
         self.log.debug("Pod Creation Request: \n%s", json_pod)
         try:
-            resp = self.kube_client.create_namespaced_pod(
-                body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs
-            )
+            with Stats.timer("kubernetes_executor.pod_creation"):
+                resp = self.kube_client.create_namespaced_pod(
+                    body=sanitized_pod, namespace=pod.metadata.namespace, 
**kwargs
+                )
+            Stats.incr("kubernetes_executor.pod_creation_status", 
tags={"status": "200"})
             self.log.debug("Pod Creation Response: %s", resp)
+        except ApiException as e:
+            Stats.incr("kubernetes_executor.pod_creation_status", 
tags={"status": str(e.status)})
+            self.log.exception("Exception when attempting to create Namespaced 
Pod: %s", json_pod)
+            raise
         except Exception as e:
+            Stats.incr("kubernetes_executor.pod_creation_status", 
tags={"status": "error"})
             self.log.exception("Exception when attempting to create Namespaced 
Pod: %s", json_pod)
             raise e
         return resp
@@ -608,13 +615,16 @@ class AirflowKubernetesScheduler(LoggingMixin):
         """Delete Pod from a namespace; does not raise if it does not exist."""
         try:
             self.log.info("Deleting pod %s in namespace %s", pod_name, 
namespace)
-            self.kube_client.delete_namespaced_pod(
-                pod_name,
-                namespace,
-                
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
-                **self.kube_config.kube_client_request_args,
-            )
+            with Stats.timer("kubernetes_executor.pod_deletion"):
+                self.kube_client.delete_namespaced_pod(
+                    pod_name,
+                    namespace,
+                    
body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
+                    **self.kube_config.kube_client_request_args,
+                )
+            Stats.incr("kubernetes_executor.pod_deletion_status", 
tags={"status": "200"})
         except ApiException as e:
+            Stats.incr("kubernetes_executor.pod_deletion_status", 
tags={"status": str(e.status)})
             # If the pod is already deleted
             if str(e.status) != "404":
                 raise
@@ -631,24 +641,30 @@ class AirflowKubernetesScheduler(LoggingMixin):
             namespace,
         )
         try:
-            self.kube_client.patch_namespaced_pod(
-                name=pod_name,
-                namespace=namespace,
-                body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
-            )
-        except ApiException:
+            with Stats.timer("kubernetes_executor.pod_patching"):
+                self.kube_client.patch_namespaced_pod(
+                    name=pod_name,
+                    namespace=namespace,
+                    body={"metadata": {"labels": {POD_REVOKED_KEY: "True"}}},
+                )
+            Stats.incr("kubernetes_executor.pod_patching_status", 
tags={"status": "200"})
+        except ApiException as e:
+            Stats.incr("kubernetes_executor.pod_patching_status", 
tags={"status": str(e.status)})
             self.log.warning("Failed to patch pod %s with pod revoked key.", 
pod_name, exc_info=True)
 
     def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
         """Add a "done" annotation to ensure we don't continually adopt 
pods."""
         self.log.debug("Patching pod %s in namespace %s to mark it as done", 
pod_name, namespace)
         try:
-            self.kube_client.patch_namespaced_pod(
-                name=pod_name,
-                namespace=namespace,
-                body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
-            )
+            with Stats.timer("kubernetes_executor.pod_patching"):
+                self.kube_client.patch_namespaced_pod(
+                    name=pod_name,
+                    namespace=namespace,
+                    body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: 
"True"}}},
+                )
+            Stats.incr("kubernetes_executor.pod_patching_status", 
tags={"status": "200"})
         except ApiException as e:
+            Stats.incr("kubernetes_executor.pod_patching_status", 
tags={"status": str(e.status)})
             self.log.info("Failed to patch pod %s with done annotation. 
Reason: %s", pod_name, e)
 
     def sync(self) -> None:
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 9805670ec81..354fdbf05c9 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -244,6 +244,55 @@ class TestAirflowKubernetesScheduler:
         finally:
             kube_executor.end()
 
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+    def test_delete_pod_emits_metrics_on_success(
+        self, mock_watcher, mock_client, mock_kube_client, mock_stats
+    ):
+        pod_name = "my-pod-1"
+        namespace = "my-namespace-1"
+        mock_kube_client.return_value.delete_namespaced_pod = mock.MagicMock()
+
+        kube_executor = KubernetesExecutor()
+        kube_executor.job_id = 1
+        kube_executor.start()
+        try:
+            kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+            
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
+            
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", 
tags={"status": "200"})
+        finally:
+            kube_executor.end()
+
+    @pytest.mark.skipif(
+        AirflowKubernetesScheduler is None, reason="kubernetes python package 
is not installed"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.Stats")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.client")
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+    def test_delete_pod_emits_metrics_on_failure(
+        self, mock_watcher, mock_client, mock_kube_client, mock_stats
+    ):
+        pod_name = "my-pod-1"
+        namespace = "my-namespace-2"
+        mock_kube_client.return_value.delete_namespaced_pod.side_effect = 
ApiException(status=429)
+
+        kube_executor = KubernetesExecutor()
+        kube_executor.job_id = 1
+        kube_executor.start()
+        try:
+            with pytest.raises(ApiException):
+                kube_executor.kube_scheduler.delete_pod(pod_name, namespace)
+            
mock_stats.timer.assert_any_call("kubernetes_executor.pod_deletion")
+            
mock_stats.incr.assert_any_call("kubernetes_executor.pod_deletion_status", 
tags={"status": "429"})
+        finally:
+            kube_executor.end()
+
     def test_running_pod_log_lines(self):
         # default behaviour
         kube_executor = KubernetesExecutor()
diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
index fa6111ef54a..876cc3cc895 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml
@@ -534,6 +534,28 @@ metrics:
     legacy_name: "edge_worker.ti.finish.{queue}.{state}.{dag_id}.{task_id}"
     name_variables: ["queue", "state", "dag_id", "task_id"]
 
+  - name: "kubernetes_executor.pod_creation_status"
+    description: "Number of Kubernetes create_namespaced_pod calls from the 
Kubernetes Executor,
+    tagged by HTTP response status (``200`` on success, the ``ApiException`` 
status code on
+    failure, ``error`` for non-API exceptions)."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: ["status"]
+
+  - name: "kubernetes_executor.pod_deletion_status"
+    description: "Number of Kubernetes delete_namespaced_pod calls from the 
Kubernetes Executor,
+    tagged by HTTP response status (``200`` on success, the ``ApiException`` 
status code on failure)."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: ["status"]
+
+  - name: "kubernetes_executor.pod_patching_status"
+    description: "Number of Kubernetes patch_namespaced_pod calls from the 
Kubernetes Executor,
+    tagged by HTTP response status (``200`` on success, the ``ApiException`` 
status code on failure)."
+    type: "counter"
+    legacy_name: "-"
+    name_variables: ["status"]
+
   # ==========
   # Timers
   # ==========
@@ -641,6 +663,24 @@ metrics:
     legacy_name: "-"
     name_variables: []
 
+  - name: "kubernetes_executor.pod_creation"
+    description: "Milliseconds taken for a Kubernetes create_namespaced_pod 
call from the Kubernetes Executor"
+    type: "timer"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "kubernetes_executor.pod_deletion"
+    description: "Milliseconds taken for a Kubernetes delete_namespaced_pod 
call from the Kubernetes Executor"
+    type: "timer"
+    legacy_name: "-"
+    name_variables: []
+
+  - name: "kubernetes_executor.pod_patching"
+    description: "Milliseconds taken for a Kubernetes patch_namespaced_pod 
call from the Kubernetes Executor"
+    type: "timer"
+    legacy_name: "-"
+    name_variables: []
+
   - name: "batch_executor.adopt_task_instances.duration"
     description: "Milliseconds taken to adopt the task instances in the AWS 
Batch Executor"
     type: "timer"

Reply via email to