This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dcffbb4aff Adding configuration to control retry parameters for k8s
api client (#29809)
dcffbb4aff is described below
commit dcffbb4aff090e6c7b6dc96a4a68b188424ae174
Author: Amogh Desai <[email protected]>
AuthorDate: Fri Apr 14 19:07:42 2023 +0530
Adding configuration to control retry parameters for k8s api client (#29809)
* Adding configuration to control retry parameters for k8s api client
* Handling review comments
* Fixing code bug
* Fixing failing tests
* Temporary commit with UT wip
* Fixing unit test
* Fixing the strict checks
* Handling review comments from Hussein
* Revert "Handling review comments from Hussein"
This reverts commit fa3bc260f7462c42620f694ee97b7f15c0b0b9c3.
* Fixing failing ut
* Reverting bad hack
* Updating logic in kube_client.py
Co-authored-by: Hussein Awala <[email protected]>
* Fixing unit tests
* Fixing unit tests
* Handling review comments from Ash
* Fix loading mock call args for python3.7
* Apply suggestions from code review
* fix static check
* add in 2.6.0
---------
Co-authored-by: Amogh <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
---
airflow/config_templates/config.yml | 7 +++++++
airflow/config_templates/default_airflow.cfg | 4 ++++
airflow/kubernetes/kube_client.py | 21 +++++++++++++++++----
tests/kubernetes/test_client.py | 11 +++++++++++
4 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index e305268a6b..1d458a8c36 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2565,6 +2565,13 @@ kubernetes_executor:
previous_name: kubernetes
version: 2.5.0
options:
+ api_client_retry_configuration:
+ description: |
+ Kwargs to override the default urllib3 Retry used in the kubernetes
API client
+ version_added: 2.6.0
+ type: string
+ example: '{ "total": 3, "backoff_factor": 0.5 }'
+ default: ""
pod_template_file:
description: |
Path to the YAML pod file that forms the basis for KubernetesExecutor
workers.
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 7f440ec199..69393e014e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1282,6 +1282,10 @@ use_ssl = False
verify_certs = True
[kubernetes_executor]
+# Kwargs to override the default urllib3 Retry used in the kubernetes API
client
+# Example: api_client_retry_configuration = {{ "total": 3, "backoff_factor":
0.5 }}
+api_client_retry_configuration =
+
# Path to the YAML pod file that forms the basis for KubernetesExecutor
workers.
pod_template_file =
diff --git a/airflow/kubernetes/kube_client.py
b/airflow/kubernetes/kube_client.py
index 7e887ae1ac..eb3912db3c 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -19,6 +19,8 @@ from __future__ import annotations
import logging
+import urllib3.util
+
from airflow.configuration import conf
log = logging.getLogger(__name__)
@@ -107,16 +109,27 @@ def get_kube_client(
if conf.getboolean("kubernetes_executor", "enable_tcp_keepalive"):
_enable_tcp_keepalive()
+ configuration = _get_default_configuration()
+ api_client_retry_configuration = conf.getjson("kubernetes",
"api_client_retry_configuration", fallback={})
+
+ if not conf.getboolean("kubernetes_executor", "verify_ssl"):
+ _disable_verify_ssl()
+
+ if isinstance(api_client_retry_configuration, dict):
+ configuration.retries =
urllib3.util.Retry(**api_client_retry_configuration)
+ else:
+ raise ValueError("api_client_retry_configuration should be a
dictionary")
+
if in_cluster:
- config.load_incluster_config()
+ config.load_incluster_config(client_configuration=configuration)
else:
if cluster_context is None:
cluster_context = conf.get("kubernetes_executor",
"cluster_context", fallback=None)
if config_file is None:
config_file = conf.get("kubernetes_executor", "config_file",
fallback=None)
- config.load_kube_config(config_file=config_file,
context=cluster_context)
-
- configuration = _get_default_configuration()
+ config.load_kube_config(
+ config_file=config_file, context=cluster_context,
client_configuration=configuration
+ )
if not conf.getboolean("kubernetes_executor", "verify_ssl"):
configuration.verify_ssl = False
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 26eda47952..7157653e8c 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -23,6 +23,7 @@ from kubernetes.client import Configuration
from urllib3.connection import HTTPConnection, HTTPSConnection
from airflow.kubernetes.kube_client import _disable_verify_ssl,
_enable_tcp_keepalive, get_kube_client
+from tests.test_utils.config import conf_vars
class TestClient:
@@ -42,6 +43,7 @@ class TestClient:
@mock.patch("airflow.kubernetes.kube_client.conf")
def test_load_config_disable_ssl(self, conf, config):
conf.getboolean.return_value = False
+ conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
client = get_kube_client(in_cluster=False)
conf.getboolean.assert_called_with("kubernetes_executor", "verify_ssl")
assert not client.api_client.configuration.verify_ssl
@@ -50,6 +52,7 @@ class TestClient:
@mock.patch("airflow.kubernetes.kube_client.conf")
def test_load_config_ssl_ca_cert(self, conf, config):
conf.get.return_value = "/path/to/ca.crt"
+ conf.getjson.return_value = {"total": 3, "backoff_factor": 0.5}
client = get_kube_client(in_cluster=False)
conf.get.assert_called_with("kubernetes_executor", "ssl_ca_cert")
assert client.api_client.configuration.ssl_ca_cert == "/path/to/ca.crt"
@@ -81,3 +84,11 @@ class TestClient:
else:
configuration = Configuration()
assert not configuration.verify_ssl
+
+ @mock.patch("kubernetes.config.incluster_config.InClusterConfigLoader")
+ @conf_vars({("kubernetes", "api_client_retry_configuration"): '{"total":
3, "backoff_factor": 0.5}'})
+ def test_api_client_retry_configuration_correct_values(self,
mock_in_cluster_loader):
+ get_kube_client(in_cluster=True)
+ client_configuration =
mock_in_cluster_loader().load_and_set.call_args[0][0]
+ assert client_configuration.retries.total == 3
+ assert client_configuration.retries.backoff_factor == 0.5