This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dcffbb4aff Adding configuration to control retry parameters for k8s 
api client (#29809)
dcffbb4aff is described below

commit dcffbb4aff090e6c7b6dc96a4a68b188424ae174
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Apr 14 19:07:42 2023 +0530

    Adding configuration to control retry parameters for k8s api client (#29809)
    
    * Adding configuration to control retry parameters for k8s api client
    
    * Handling review comments
    
    * Fixing code bug
    
    * Fixing failing tests
    
    * Temporary commit with UT wip
    
    * Fixing unit test
    
    * Fixing the strict checks
    
    * Handling review comments from Hussein
    
    * Revert "Handling review comments from Hussein"
    
    This reverts commit fa3bc260f7462c42620f694ee97b7f15c0b0b9c3.
    
    * Fixing failing ut
    
    * Reverting bad hack
    
    * Updating logic in kube_client.py
    
    Co-authored-by: Hussein Awala <[email protected]>
    
    * Fixing unit tests
    
    * Fixing unit tests
    
    * Handling review comments from Ash
    
    * Fix loading mock call args for python3.7
    
    * Apply suggestions from code review
    
    * fix static check
    
    * add in 2.6.0
    
    ---------
    
    Co-authored-by: Amogh <[email protected]>
    Co-authored-by: Hussein Awala <[email protected]>
---
 airflow/config_templates/config.yml          |  7 +++++++
 airflow/config_templates/default_airflow.cfg |  4 ++++
 airflow/kubernetes/kube_client.py            | 21 +++++++++++++++++----
 tests/kubernetes/test_client.py              | 11 +++++++++++
 4 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index e305268a6b..1d458a8c36 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2565,6 +2565,13 @@ kubernetes_executor:
     previous_name: kubernetes
     version: 2.5.0
   options:
+    api_client_retry_configuration:
+      description: |
+        Kwargs to override the default urllib3 Retry used in the kubernetes 
API client
+      version_added: 2.6.0
+      type: string
+      example: '{ "total": 3, "backoff_factor": 0.5 }'
+      default: ""
     pod_template_file:
       description: |
         Path to the YAML pod file that forms the basis for KubernetesExecutor 
workers.
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 7f440ec199..69393e014e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1282,6 +1282,10 @@ use_ssl = False
 verify_certs = True
 
 [kubernetes_executor]
+# Kwargs to override the default urllib3 Retry used in the kubernetes API 
client
+# Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor": 
0.5 }}
+api_client_retry_configuration =
+
 # Path to the YAML pod file that forms the basis for KubernetesExecutor 
workers.
 pod_template_file =
 
diff --git a/airflow/kubernetes/kube_client.py 
b/airflow/kubernetes/kube_client.py
index 7e887ae1ac..eb3912db3c 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -19,6 +19,8 @@ from __future__ import annotations
 
 import logging
 
+import urllib3.util
+
 from airflow.configuration import conf
 
 log = logging.getLogger(__name__)
@@ -107,16 +109,27 @@ def get_kube_client(
     if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"):
         _enable_tcp_keepalive()
 
+    configuration = _get_default_configuration()
+    api_client_retry_configuration = conf.getjson("kubernetes", 
"api_client_retry_configuration", fallback={})
+
+    if not conf.getboolean("kubernetes_executor", "verify_ssl"):
+        _disable_verify_ssl()
+
+    if isinstance(api_client_retry_configuration, dict):
+        configuration.retries = 
urllib3.util.Retry(**api_client_retry_configuration)
+    else:
+        raise ValueError("api_client_retry_configuration should be a 
dictionary")
+
     if in_cluster:
-        config.load_incluster_config()
+        config.load_incluster_config(client_configuration=configuration)
     else:
         if cluster_context is None:
             cluster_context = conf.get("kubernetes_executor", 
"cluster_context", fallback=None)
         if config_file is None:
             config_file = conf.get("kubernetes_executor", "config_file", 
fallback=None)
-        config.load_kube_config(config_file=config_file, 
context=cluster_context)
-
-    configuration = _get_default_configuration()
+        config.load_kube_config(
+            config_file=config_file, context=cluster_context, 
client_configuration=configuration
+        )
 
     if not conf.getboolean("kubernetes_executor", "verify_ssl"):
         configuration.verify_ssl = False
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 26eda47952..7157653e8c 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -23,6 +23,7 @@ from kubernetes.client import Configuration
 from urllib3.connection import HTTPConnection, HTTPSConnection
 
 from airflow.kubernetes.kube_client import _disable_verify_ssl, 
_enable_tcp_keepalive, get_kube_client
+from tests.test_utils.config import conf_vars
 
 
 class TestClient:
@@ -42,6 +43,7 @@ class TestClient:
     @mock.patch("airflow.kubernetes.kube_client.conf")
     def test_load_config_disable_ssl(self, conf, config):
         conf.getboolean.return_value = False
+        conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
         client = get_kube_client(in_cluster=False)
         conf.getboolean.assert_called_with("kubernetes_executor", "verify_ssl")
         assert not client.api_client.configuration.verify_ssl
@@ -50,6 +52,7 @@ class TestClient:
     @mock.patch("airflow.kubernetes.kube_client.conf")
     def test_load_config_ssl_ca_cert(self, conf, config):
         conf.get.return_value = "/path/to/ca.crt"
+        conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
         client = get_kube_client(in_cluster=False)
         conf.get.assert_called_with("kubernetes_executor", "ssl_ca_cert")
         assert client.api_client.configuration.ssl_ca_cert == "/path/to/ca.crt"
@@ -81,3 +84,11 @@ class TestClient:
         else:
             configuration = Configuration()
         assert not configuration.verify_ssl
+
+    @mock.patch("kubernetes.config.incluster_config.InClusterConfigLoader")
+    @conf_vars({("kubernetes", "api_client_retry_configuration"): '{"total": 
3, "backoff_factor": 0.5}'})
+    def test_api_client_retry_configuration_correct_values(self, 
mock_in_cluster_loader):
+        get_kube_client(in_cluster=True)
+        client_configuration = 
mock_in_cluster_loader().load_and_set.call_args[0][0]
+        assert client_configuration.retries.total == 3
+        assert client_configuration.retries.backoff_factor == 0.5

Reply via email to