This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9c1c61537 Pass content of kube/config file to triggerer as a
dictionary (#41178)
f9c1c61537 is described below
commit f9c1c615377869120382905ec97e38e7e7916678
Author: VladaZakharova <[email protected]>
AuthorDate: Fri Aug 2 16:22:00 2024 +0200
Pass content of kube/config file to triggerer as a dictionary (#41178)
---
airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 18 +++++++-----------
airflow/providers/cncf/kubernetes/operators/pod.py | 13 ++++++++++++-
airflow/providers/cncf/kubernetes/triggers/pod.py | 10 +++++-----
tests/providers/cncf/kubernetes/operators/test_pod.py | 8 +++++---
tests/providers/cncf/kubernetes/triggers/test_pod.py | 6 +++---
.../google/cloud/operators/test_kubernetes_engine.py | 6 +++++-
6 files changed, 37 insertions(+), 24 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 2f4516cedf..1d3da7dee0 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -653,18 +653,19 @@ def _get_bool(val) -> bool | None:
class AsyncKubernetesHook(KubernetesHook):
"""Hook to use Kubernetes SDK asynchronously."""
- def __init__(self, *args, **kwargs):
+ def __init__(self, config_dict: dict | None = None, *args, **kwargs):
super().__init__(*args, **kwargs)
+
+ self.config_dict = config_dict
self._extras: dict | None = None
async def _load_config(self):
"""Return Kubernetes API session for use with requests."""
in_cluster = self._coalesce_param(self.in_cluster, await
self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await
self._get_field("cluster_context"))
- kubeconfig_path = self._coalesce_param(self.config_file, await
self._get_field("kube_config_path"))
kubeconfig = await self._get_field("kube_config")
- num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig,
kubeconfig_path] if o)
+ num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig,
self.config_dict] if o)
if num_selected_configuration > 1:
raise AirflowException(
@@ -679,14 +680,9 @@ class AsyncKubernetesHook(KubernetesHook):
async_config.load_incluster_config()
return async_client.ApiClient()
- if kubeconfig_path:
-
self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("kube_config"))
- self._is_in_cluster = False
- await async_config.load_kube_config(
- config_file=kubeconfig_path,
- client_configuration=self.client_configuration,
- context=cluster_context,
- )
+ if self.config_dict:
+ self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config
dictionary"))
+ await async_config.load_kube_config_from_dict(self.config_dict)
return async_client.ApiClient()
if kubeconfig is not None:
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 8df4137a28..921fdaa927 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -22,6 +22,7 @@ import datetime
import json
import logging
import math
+import os
import re
import shlex
import string
@@ -695,8 +696,18 @@ class KubernetesPodOperator(BaseOperator):
ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
+ self.convert_config_file_to_dict()
self.invoke_defer_method()
+ def convert_config_file_to_dict(self):
+ """Convert passed config_file to dict representation."""
+ config_file = self.config_file if self.config_file else
os.environ.get(KUBE_CONFIG_ENV_VAR)
+ if config_file:
+ with open(config_file) as f:
+ self._config_dict = yaml.safe_load(f)
+ else:
+ self._config_dict = None
+
def invoke_defer_method(self, last_log_time: DateTime | None = None) ->
None:
"""Redefine triggers which are being used in child classes."""
trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
@@ -707,7 +718,7 @@ class KubernetesPodOperator(BaseOperator):
trigger_start_time=trigger_start_time,
kubernetes_conn_id=self.kubernetes_conn_id,
cluster_context=self.cluster_context,
- config_file=self.config_file,
+ config_dict=self._config_dict,
in_cluster=self.in_cluster,
poll_interval=self.poll_interval,
get_logs=self.get_logs,
diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py
b/airflow/providers/cncf/kubernetes/triggers/pod.py
index 5fabce5001..102dc43800 100644
--- a/airflow/providers/cncf/kubernetes/triggers/pod.py
+++ b/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ -61,7 +61,7 @@ class KubernetesPodTrigger(BaseTrigger):
:param kubernetes_conn_id: The :ref:`kubernetes connection id
<howto/connection:kubernetes>`
for the Kubernetes cluster.
:param cluster_context: Context that points to kubernetes cluster.
- :param config_file: Path to kubeconfig file.
+ :param config_dict: Content of kubeconfig file in dict format.
:param poll_interval: Polling period in seconds to check for the status.
:param trigger_start_time: time in Datetime format when the trigger was
started
:param in_cluster: run kubernetes client with in_cluster configuration.
@@ -89,7 +89,7 @@ class KubernetesPodTrigger(BaseTrigger):
kubernetes_conn_id: str | None = None,
poll_interval: float = 2,
cluster_context: str | None = None,
- config_file: str | None = None,
+ config_dict: dict | None = None,
in_cluster: bool | None = None,
get_logs: bool = True,
startup_timeout: int = 120,
@@ -107,7 +107,7 @@ class KubernetesPodTrigger(BaseTrigger):
self.kubernetes_conn_id = kubernetes_conn_id
self.poll_interval = poll_interval
self.cluster_context = cluster_context
- self.config_file = config_file
+ self.config_dict = config_dict
self.in_cluster = in_cluster
self.get_logs = get_logs
self.startup_timeout = startup_timeout
@@ -142,7 +142,7 @@ class KubernetesPodTrigger(BaseTrigger):
"kubernetes_conn_id": self.kubernetes_conn_id,
"poll_interval": self.poll_interval,
"cluster_context": self.cluster_context,
- "config_file": self.config_file,
+ "config_dict": self.config_dict,
"in_cluster": self.in_cluster,
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
@@ -282,7 +282,7 @@ class KubernetesPodTrigger(BaseTrigger):
return AsyncKubernetesHook(
conn_id=self.kubernetes_conn_id,
in_cluster=self.in_cluster,
- config_file=self.config_file,
+ config_dict=self.config_dict,
cluster_context=self.cluster_context,
)
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index c847ebcfac..b69b73d692 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -21,7 +21,7 @@ import re
from contextlib import contextmanager, nullcontext
from io import BytesIO
from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, mock_open, patch
import pendulum
import pytest
@@ -1866,7 +1866,7 @@ class TestKubernetesPodOperatorAsync:
@patch(KUB_OP_PATH.format("build_pod_request_obj"))
@patch(KUB_OP_PATH.format("get_or_create_pod"))
def test_async_create_pod_should_execute_successfully(
- self, mocked_pod, mocked_pod_obj, mocked_found_pod, mocked_client,
do_xcom_push
+ self, mocked_pod, mocked_pod_obj, mocked_found_pod, mocked_client,
do_xcom_push, mocker
):
"""
Asserts that a task is deferred and the KubernetesCreatePodTrigger
will be fired
@@ -1889,7 +1889,9 @@ class TestKubernetesPodOperatorAsync:
deferrable=True,
do_xcom_push=do_xcom_push,
)
- k.config_file_in_dict_representation = {"a": "b"}
+
+ mock_file = mock_open(read_data='{"a": "b"}')
+ mocker.patch("builtins.open", mock_file)
mocked_pod.return_value.metadata.name = TEST_NAME
mocked_pod.return_value.metadata.namespace = TEST_NAMESPACE
diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py
b/tests/providers/cncf/kubernetes/triggers/test_pod.py
index 610db56c75..0e7522480a 100644
--- a/tests/providers/cncf/kubernetes/triggers/test_pod.py
+++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py
@@ -39,7 +39,7 @@ NAMESPACE = "default"
CONN_ID = "test_kubernetes_conn_id"
POLL_INTERVAL = 2
CLUSTER_CONTEXT = "test-context"
-CONFIG_FILE = "/path/to/config/file"
+CONFIG_DICT = {"a": "b"}
IN_CLUSTER = False
GET_LOGS = True
STARTUP_TIMEOUT_SECS = 120
@@ -58,7 +58,7 @@ def trigger():
kubernetes_conn_id=CONN_ID,
poll_interval=POLL_INTERVAL,
cluster_context=CLUSTER_CONTEXT,
- config_file=CONFIG_FILE,
+ config_dict=CONFIG_DICT,
in_cluster=IN_CLUSTER,
get_logs=GET_LOGS,
startup_timeout=STARTUP_TIMEOUT_SECS,
@@ -101,7 +101,7 @@ class TestKubernetesPodTrigger:
"kubernetes_conn_id": CONN_ID,
"poll_interval": POLL_INTERVAL,
"cluster_context": CLUSTER_CONTEXT,
- "config_file": CONFIG_FILE,
+ "config_dict": CONFIG_DICT,
"in_cluster": IN_CLUSTER,
"get_logs": GET_LOGS,
"startup_timeout": STARTUP_TIMEOUT_SECS,
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index f8416c2189..e4d9c4b8d5 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import json
import os
from unittest import mock
+from unittest.mock import mock_open
import pytest
from google.cloud.container_v1.types import Cluster, NodePool
@@ -739,12 +740,15 @@ class TestGKEPodOperatorAsync:
)
@mock.patch(f"{GKE_OP_PATH}.fetch_cluster_info")
def test_async_create_pod_should_execute_successfully(
- self, fetch_cluster_info_mock, get_con_mock, mocked_pod, mocked_pod_obj
+ self, fetch_cluster_info_mock, get_con_mock, mocked_pod,
mocked_pod_obj, mocker
):
"""
Asserts that a task is deferred and the GKEStartPodTrigger will be
fired
when the GKEStartPodOperator is executed in deferrable mode when
deferrable=True.
"""
+ mock_file = mock_open(read_data='{"a": "b"}')
+ mocker.patch("builtins.open", mock_file)
+
self.gke_op._cluster_url = CLUSTER_URL
self.gke_op._ssl_ca_cert = SSL_CA_CERT
with pytest.raises(TaskDeferred) as exc: