This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae46b9e721c fix: kpo async kube_config_path (#45571)
ae46b9e721c is described below
commit ae46b9e721ceb1e9d6a8536abbd501a3aeda6244
Author: raphaelauv <[email protected]>
AuthorDate: Fri Feb 7 14:50:15 2025 +0100
fix: kpo async kube_config_path (#45571)
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 16 ++++++++--
.../cncf/kubernetes/hooks/test_kubernetes.py | 35 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 114a933e50e..0c9b655bd1f 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -734,9 +734,11 @@ class AsyncKubernetesHook(KubernetesHook):
"""Return Kubernetes API session for use with requests."""
in_cluster = self._coalesce_param(self.in_cluster, await
self._get_field("in_cluster"))
cluster_context = self._coalesce_param(self.cluster_context, await
self._get_field("cluster_context"))
+ kubeconfig_path = await self._get_field("kube_config_path")
kubeconfig = await self._get_field("kube_config")
-
- num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig,
self.config_dict] if o)
+ num_selected_configuration = sum(
+ 1 for o in [in_cluster, kubeconfig, kubeconfig_path,
self.config_dict] if o
+ )
if num_selected_configuration > 1:
raise AirflowException(
@@ -757,6 +759,16 @@ class AsyncKubernetesHook(KubernetesHook):
await async_config.load_kube_config_from_dict(self.config_dict)
return async_client.ApiClient()
+ if kubeconfig_path is not None:
+ self.log.debug("loading kube_config from: %s", kubeconfig_path)
+ self._is_in_cluster = False
+ await async_config.load_kube_config(
+ config_file=kubeconfig_path,
+ client_configuration=self.client_configuration,
+ context=cluster_context,
+ )
+ return async_client.ApiClient()
+
if kubeconfig is not None:
async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
self.log.debug(
diff --git
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
index ae9514b5b8e..97d00a5c3b2 100644
---
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
@@ -24,8 +24,10 @@ from asyncio import Future
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
+import anyio
import kubernetes
import pytest
+import yaml
from kubernetes.client import V1Deployment, V1DeploymentStatus
from kubernetes.client.rest import ApiException
from kubernetes.config import ConfigException
@@ -898,6 +900,39 @@ class TestAsyncKubernetesHook:
kube_config_loader.assert_called_once()
kube_config_merger.assert_called_once()
+ @pytest.mark.asyncio
+ @mock.patch(INCLUSTER_CONFIG_LOADER)
+ @mock.patch(KUBE_CONFIG_MERGER)
+ async def test_load_config_with_conn_id_kube_config_path(
+ self, kube_config_merger, incluster_config, kube_config_loader,
tmp_path
+ ):
+ file_name = f"{tmp_path}/config"
+ extra = {"kube_config_path": file_name}
+ try:
+ merge_conn(
+ Connection(
+ conn_type="kubernetes",
+ conn_id=CONN_ID,
+ extra=json.dumps(extra),
+ ),
+ )
+ async with await anyio.open_file(file_name, "w+") as f:
+ yaml.dump({"a": "b"}, f)
+ hook = AsyncKubernetesHook(
+ conn_id=CONN_ID,
+ in_cluster=False,
+ config_file=None,
+ cluster_context=None,
+ )
+ await hook._load_config()
+ assert not incluster_config.called
+ kube_config_loader.assert_called_once()
+ kube_config_merger.assert_called_once()
+ except:
+ raise
+ finally:
+ clear_db_connections()
+
@pytest.mark.asyncio
@mock.patch(INCLUSTER_CONFIG_LOADER)
@mock.patch(KUBE_CONFIG_MERGER)