This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b5296b7436 add missing read for K8S config file from conn in deferred
`KubernetesPodOperator` (#29498)
b5296b7436 is described below
commit b5296b74361bfe2449033eca5f732c4a4377f6bb
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Apr 22 19:30:42 2023 +0200
add missing read for K8S config file from conn in deferred
`KubernetesPodOperator` (#29498)
* restore convert_config_file_to_dict method and deprecate it
---
airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 18 +++++++++++-------
airflow/providers/cncf/kubernetes/operators/pod.py | 11 +++++++----
airflow/providers/cncf/kubernetes/triggers/pod.py | 11 +++++------
tests/providers/cncf/kubernetes/operators/test_pod.py | 3 +--
tests/providers/cncf/kubernetes/triggers/test_pod.py | 6 +++---
.../google/cloud/operators/test_kubernetes_engine.py | 3 +--
6 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 91c00f483c..4b6f436327 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -467,19 +467,18 @@ def _get_bool(val) -> bool | None:
class AsyncKubernetesHook(KubernetesHook):
"""Hook to use Kubernetes SDK asynchronously."""
- def __init__(self, config_dict: dict | None = None, *args, **kwargs):
+ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- self.config_dict = config_dict
-
self._extras: dict | None = None
async def _load_config(self):
"""Returns Kubernetes API session for use with requests"""
in_cluster = self._coalesce_param(self.in_cluster, await
self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await
self._get_field("cluster_context"))
+ kubeconfig_path = self._coalesce_param(self.config_file, await
self._get_field("kube_config_path"))
kubeconfig = await self._get_field("kube_config")
- num_selected_configuration = len([o for o in [in_cluster, kubeconfig,
self.config_dict] if o])
+ num_selected_configuration = len([o for o in [in_cluster, kubeconfig,
kubeconfig_path] if o])
if num_selected_configuration > 1:
raise AirflowException(
@@ -494,9 +493,14 @@ class AsyncKubernetesHook(KubernetesHook):
async_config.load_incluster_config()
return async_client.ApiClient()
- if self.config_dict:
- self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config
dictionary"))
- await async_config.load_kube_config_from_dict(self.config_dict)
+ if kubeconfig_path:
+
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("kube_config"))
+ self._is_in_cluster = False
+ await async_config.load_kube_config(
+ config_file=kubeconfig_path,
+ client_configuration=self.client_configuration,
+ context=cluster_context,
+ )
return async_client.ApiClient()
if kubeconfig is not None:
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index cec66dfe8c..6f5a3db603 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -373,8 +373,7 @@ class KubernetesPodOperator(BaseOperator):
self.deferrable = deferrable
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
-
- self._config_dict: dict | None = None
+ self._config_dict: dict | None = None # TODO: remove it when removing
convert_config_file_to_dict
@cached_property
def _incluster_namespace(self):
@@ -572,11 +571,15 @@ class KubernetesPodOperator(BaseOperator):
pod_request_obj=self.pod_request_obj,
context=context,
)
- self.convert_config_file_to_dict()
self.invoke_defer_method()
def convert_config_file_to_dict(self):
"""Converts passed config_file to dict format."""
+ warnings.warn(
+ "This method is deprecated and will be removed in a future
version.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
config_file = self.config_file if self.config_file else
os.environ.get(KUBE_CONFIG_ENV_VAR)
if config_file:
with open(config_file) as f:
@@ -594,7 +597,7 @@ class KubernetesPodOperator(BaseOperator):
trigger_start_time=trigger_start_time,
kubernetes_conn_id=self.kubernetes_conn_id,
cluster_context=self.cluster_context,
- config_dict=self._config_dict,
+ config_file=self.config_file,
in_cluster=self.in_cluster,
poll_interval=self.poll_interval,
should_delete_pod=self.is_delete_operator_pod,
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index b4a8816253..faac3e30bc 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -52,8 +52,7 @@ class KubernetesPodTrigger(BaseTrigger):
:param kubernetes_conn_id: The :ref:`kubernetes connection id
<howto/connection:kubernetes>`
for the Kubernetes cluster.
:param cluster_context: Context that points to kubernetes cluster.
- :param config_dict: Kubernetes config file content in dict format. If not
specified,
- default value is ``~/.kube/config``
+ :param config_file: Path to kubeconfig file.
:param poll_interval: Polling period in seconds to check for the status.
:param trigger_start_time: time in Datetime format when the trigger was
started
:param in_cluster: run kubernetes client with in_cluster configuration.
@@ -73,7 +72,7 @@ class KubernetesPodTrigger(BaseTrigger):
kubernetes_conn_id: str | None = None,
poll_interval: float = 2,
cluster_context: str | None = None,
- config_dict: dict | None = None,
+ config_file: str | None = None,
in_cluster: bool | None = None,
should_delete_pod: bool = True,
get_logs: bool = True,
@@ -87,7 +86,7 @@ class KubernetesPodTrigger(BaseTrigger):
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
self.cluster_context = cluster_context
- self.config_dict = config_dict
+ self.config_file = config_file
self.in_cluster = in_cluster
self.should_delete_pod = should_delete_pod
self.get_logs = get_logs
@@ -107,7 +106,7 @@ class KubernetesPodTrigger(BaseTrigger):
"kubernetes_conn_id": self.kubernetes_conn_id,
"poll_interval": self.poll_interval,
"cluster_context": self.cluster_context,
- "config_dict": self.config_dict,
+ "config_file": self.config_file,
"in_cluster": self.in_cluster,
"should_delete_pod": self.should_delete_pod,
"get_logs": self.get_logs,
@@ -215,7 +214,7 @@ class KubernetesPodTrigger(BaseTrigger):
self._hook = AsyncKubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
- config_dict=self.config_dict,
+ config_file=self.config_file,
cluster_context=self.cluster_context,
)
return self._hook
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index 0e1bcfdbe0..f3fd713f9b 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1230,10 +1230,9 @@ class TestKubernetesPodOperatorAsync:
)
return remote_pod_mock
- @patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
@patch(KUB_OP_PATH.format("build_pod_request_obj"))
@patch(KUB_OP_PATH.format("get_or_create_pod"))
- def test_async_create_pod_should_execute_successfully(self, mocked_pod,
mocked_pod_obj, mocked_conf_file):
+ def test_async_create_pod_should_execute_successfully(self, mocked_pod,
mocked_pod_obj):
"""
Asserts that a task is deferred and the KubernetesCreatePodTrigger
will be fired
when the KubernetesPodOperator is executed in deferrable mode when
deferrable=True.
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 57befdf51d..4f568addba 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -42,7 +42,7 @@ NAMESPACE = "default"
CONN_ID = "test_kubernetes_conn_id"
POLL_INTERVAL = 2
CLUSTER_CONTEXT = "test-context"
-CONFIG_DICT = {"a": "b"}
+CONFIG_FILE = "/path/to/config/file"
IN_CLUSTER = False
SHOULD_DELETE_POD = True
GET_LOGS = True
@@ -61,7 +61,7 @@ def trigger():
kubernetes_conn_id=CONN_ID,
poll_interval=POLL_INTERVAL,
cluster_context=CLUSTER_CONTEXT,
- config_dict=CONFIG_DICT,
+ config_file=CONFIG_FILE,
in_cluster=IN_CLUSTER,
should_delete_pod=SHOULD_DELETE_POD,
get_logs=GET_LOGS,
@@ -88,7 +88,7 @@ class TestKubernetesPodTrigger:
"kubernetes_conn_id": CONN_ID,
"poll_interval": POLL_INTERVAL,
"cluster_context": CLUSTER_CONTEXT,
- "config_dict": CONFIG_DICT,
+ "config_file": CONFIG_FILE,
"in_cluster": IN_CLUSTER,
"should_delete_pod": SHOULD_DELETE_POD,
"get_logs": GET_LOGS,
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index a474c8d474..6fa0184e88 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -313,7 +313,6 @@ class TestGKEPodOperatorAsync:
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
- @mock.patch(KUB_OP_PATH.format("convert_config_file_to_dict"))
@mock.patch.dict(os.environ, {})
@mock.patch(KUB_OP_PATH.format("build_pod_request_obj"))
@mock.patch(KUB_OP_PATH.format("get_or_create_pod"))
@@ -323,7 +322,7 @@ class TestGKEPodOperatorAsync:
)
@mock.patch(f"{GKE_OP_PATH}.fetch_cluster_info")
def test_async_create_pod_should_execute_successfully(
- self, fetch_cluster_info_mock, get_con_mock, mocked_pod,
mocked_pod_obj, mocked_config
+ self, fetch_cluster_info_mock, get_con_mock, mocked_pod, mocked_pod_obj
):
"""
Asserts that a task is deferred and the GKEStartPodTrigger will be
fired