Nataneljpwd commented on code in PR #61527: URL: https://github.com/apache/airflow/pull/61527#discussion_r2794899019
########## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py: ########## @@ -0,0 +1,215 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing connections, variables, and configs from Kubernetes Secrets.""" + +from __future__ import annotations + +import base64 +from functools import cached_property +from pathlib import Path + +from kubernetes.client import ApiClient, CoreV1Api +from kubernetes.config import load_incluster_config + +from airflow.exceptions import AirflowException +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieve connections, variables, and configs from Kubernetes Secrets using labels. + + This backend discovers secrets by querying Kubernetes labels, enabling integration + with External Secrets Operator (ESO), Sealed Secrets, or any tool that creates + Kubernetes secrets — regardless of the secret's name. + + Configurable via ``airflow.cfg``: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend + backend_kwargs = {"namespace": "airflow", "connections_label": "airflow.apache.org/connection-id"} + + The secret must have a label whose key matches the configured label and whose value + matches the requested identifier (conn_id, variable key, or config key). The actual + secret value is read from the ``value`` key in the secret's data. + + Example Kubernetes secret for a connection named ``my_db``: + + .. code-block:: yaml + + apiVersion: v1 + kind: Secret + metadata: + name: anything + labels: + airflow.apache.org/connection-id: my_db + data: + value: <base64-encoded-connection-uri> + + **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` directly + for in-cluster authentication. Does not use KubernetesHook or any Airflow connection, + avoiding circular dependencies since this IS the secrets backend. + The namespace can be set explicitly via ``backend_kwargs``. If not set, it is + auto-detected from the pod's service account metadata at + ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If auto-detection + fails (e.g. automountServiceAccountToken is disabled), an error is raised. + + **Performance:** Queries use ``resource_version="0"`` so the Kubernetes API server + serves results from its in-memory watch cache, making lookups very fast without + requiring Airflow-side caching. + + :param namespace: Kubernetes namespace to query for secrets. If not set, the + namespace is auto-detected from the pod's service account metadata. If + auto-detection fails, an ``AirflowException`` is raised. + :param connections_label: Label key used to discover connection secrets. + If set to None, requests for connections will not be sent to Kubernetes. + :param variables_label: Label key used to discover variable secrets. + If set to None, requests for variables will not be sent to Kubernetes. + :param config_label: Label key used to discover config secrets. + If set to None, requests for configurations will not be sent to Kubernetes. + :param connections_data_key: The data key in the Kubernetes secret that holds the + connection value. Default: ``"value"`` + :param variables_data_key: The data key in the Kubernetes secret that holds the + variable value. Default: ``"value"`` + :param config_data_key: The data key in the Kubernetes secret that holds the + config value. Default: ``"value"`` + """ + + DEFAULT_CONNECTIONS_LABEL = "airflow.apache.org/connection-id" + DEFAULT_VARIABLES_LABEL = "airflow.apache.org/variable-key" + DEFAULT_CONFIG_LABEL = "airflow.apache.org/config-key" + SERVICE_ACCOUNT_NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + + def __init__( + self, + namespace: str | None = None, + connections_label: str = DEFAULT_CONNECTIONS_LABEL, + variables_label: str = DEFAULT_VARIABLES_LABEL, + config_label: str = DEFAULT_CONFIG_LABEL, + connections_data_key: str = "value", + variables_data_key: str = "value", + config_data_key: str = "value", + **kwargs, + ): + super().__init__(**kwargs) + self._namespace = namespace + self.connections_label = connections_label + self.variables_label = variables_label + self.config_label = config_label + self.connections_data_key = connections_data_key + self.variables_data_key = variables_data_key + self.config_data_key = config_data_key + + @cached_property + def namespace(self) -> str: + """Return the configured namespace, or auto-detect from service account metadata.""" + if self._namespace: + return self._namespace + try: + return Path(self.SERVICE_ACCOUNT_NAMESPACE_PATH).read_text().strip() + except FileNotFoundError: + raise AirflowException( + f"Could not auto-detect Kubernetes namespace from " + f"{self.SERVICE_ACCOUNT_NAMESPACE_PATH}. " + f"Is automountServiceAccountToken disabled for this pod? " + f"Set the 'namespace' parameter explicitly in backend_kwargs." + ) + + @cached_property + def client(self) -> CoreV1Api: + """Lazy-init Kubernetes CoreV1Api client using in-cluster config directly.""" + load_incluster_config() + return CoreV1Api(ApiClient()) + + def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | None: + """ + Get serialized representation of Connection from a Kubernetes secret. + + :param conn_id: connection id + :param team_name: Team name associated to the task trying to access the connection (if any) + """ + if self.connections_label is None: + return None + return self._get_secret(self.connections_label, conn_id, self.connections_data_key) + + def get_variable(self, key: str, team_name: str | None = None) -> str | None: + """ + Get Airflow Variable from a Kubernetes secret. + + :param key: Variable Key + :param team_name: Team name associated to the task trying to access the variable (if any) + :return: Variable Value + """ + if self.variables_label is None: + return None + return self._get_secret(self.variables_label, key, self.variables_data_key) + + def get_config(self, key: str) -> str | None: + """ + Get Airflow Configuration from a Kubernetes secret. + + :param key: Configuration Option Key + :return: Configuration Option Value + """ + if self.config_label is None: + return None Review Comment: I think this check can be moved to `_get_secret` and it will allow us to not repeat ourselved, while not creating a whole new method for this simple logic ########## providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py: ########## @@ -0,0 +1,403 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import base64 +import json +from unittest import mock + +import pytest +from kubernetes.client.exceptions import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend import ( + KubernetesSecretsBackend, +) + +MODULE_PATH = "airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend" + + +def _make_secret(data: dict[str, str], name: str = "some-secret"): + """Create a mock V1Secret with base64-encoded data.""" + encoded = {k: base64.b64encode(v.encode("utf-8")).decode("utf-8") for k, v in data.items()} + secret = mock.MagicMock() + secret.data = encoded + secret.metadata.name = name + return secret + + +def _make_secret_list(secrets: list): + """Create a mock V1SecretList with the given items.""" + secret_list = mock.MagicMock() + secret_list.items = secrets + return secret_list + + +class TestKubernetesSecretsBackendConnections: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_uri(self, mock_client, mock_namespace): + """Test reading a connection URI from a Kubernetes secret.""" + uri = "postgresql://user:pass@host:5432/db" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": uri})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == uri + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_json(self, mock_client, mock_namespace): + """Test reading a JSON-formatted connection from a Kubernetes secret.""" + conn_json = json.dumps( + { + "conn_type": "postgres", + "login": "user", + "password": "pass", + "host": "host", + "port": 5432, + "schema": "db", + } + ) + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": conn_json})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == conn_json + parsed = json.loads(result) + assert parsed["conn_type"] == "postgres" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_not_found(self, mock_client, mock_namespace): + """Test that a missing secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendVariables: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable(self, mock_client, mock_namespace): + """Test reading a variable from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "my-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("api_key") + + assert result == "my-value" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/variable-key=api_key", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable_not_found(self, mock_client, mock_namespace): + """Test that a missing variable secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config(self, mock_client, mock_namespace): + """Test reading a config value from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "sqlite:///airflow.db"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_config("sql_alchemy_conn") + + assert result == "sqlite:///airflow.db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/config-key=sql_alchemy_conn", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config_not_found(self, mock_client, mock_namespace): + """Test that a missing config secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_config("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendCustomConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_label(self, mock_client, mock_namespace): + """Test using a custom label key.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_label="my-org/conn") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="my-org/conn=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_data_key(self, mock_client, mock_namespace): + """Test using a custom data key for connections.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"conn_uri": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_data_key="conn_uri") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_missing_value_data_key_returns_none(self, mock_client, mock_namespace): + """Test that a secret without the 'value' data key returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"wrong_key": "some-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_secret_with_none_data_returns_none(self, mock_client, mock_namespace): + """Test that a secret with None data returns None.""" + secret = mock.MagicMock() + secret.data = None + secret.metadata.name = "some-secret" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([secret]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + +class TestKubernetesSecretsBackendTeamName: Review Comment: From my understanding, we do nothing with the team name for now, so maybe we do not need to just repeat tests? As this does not test anything, we can add it to the docstring that the backend does not support multi-team currently, even though it seems trivial to add (just another label of team-id) but it is your choice, we can also delay it for the future ########## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py: ########## @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing connections, variables, and configs from Kubernetes Secrets.""" + +from __future__ import annotations + +import base64 +from functools import cached_property +from pathlib import Path + +from kubernetes.client import ApiClient, CoreV1Api +from kubernetes.config import load_incluster_config + +from airflow.exceptions import AirflowException +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + +class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieve connections, variables, and configs from Kubernetes Secrets using labels. + + This backend discovers secrets by querying Kubernetes labels, enabling integration + with External Secrets Operator (ESO), Sealed Secrets, or any tool that creates + Kubernetes secrets — regardless of the secret's name. + + Configurable via ``airflow.cfg``: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend + backend_kwargs = {"namespace": "airflow", "connections_label": "airflow.apache.org/connection-name"} + + The secret must have a label whose key matches the configured label and whose value + matches the requested identifier (conn_id, variable key, or config key). The actual + secret value is read from the ``value`` key in the secret's data. + + Example Kubernetes secret for a connection named ``my_db``: + + .. code-block:: yaml + + apiVersion: v1 + kind: Secret + metadata: + name: anything + labels: + airflow.apache.org/connection-name: my_db + data: + value: <base64-encoded-connection-uri> + + **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` directly + for in-cluster authentication. Does not use KubernetesHook or any Airflow connection, + avoiding circular dependencies since this IS the secrets backend. + The namespace can be set explicitly via ``backend_kwargs``. If not set, it is + auto-detected from the pod's service account metadata at + ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If auto-detection + fails (e.g. automountServiceAccountToken is disabled), an error is raised. + + **Performance:** Queries use ``resource_version="0"`` so the Kubernetes API server + serves results from its in-memory watch cache, making lookups very fast without + requiring Airflow-side caching. + + :param namespace: Kubernetes namespace to query for secrets. If not set, the + namespace is auto-detected from the pod's service account metadata. If + auto-detection fails, an ``AirflowException`` is raised. + :param connections_label: Label key used to discover connection secrets. + If set to None, requests for connections will not be sent to Kubernetes. + :param variables_label: Label key used to discover variable secrets. + If set to None, requests for variables will not be sent to Kubernetes. + :param config_label: Label key used to discover config secrets. + If set to None, requests for configurations will not be sent to Kubernetes. + :param connections_data_key: The data key in the Kubernetes secret that holds the + connection value. Default: ``"value"`` + :param variables_data_key: The data key in the Kubernetes secret that holds the + variable value. Default: ``"value"`` + :param config_data_key: The data key in the Kubernetes secret that holds the + config value. Default: ``"value"`` + """ + + def __init__( + self, + namespace: str | None = None, + connections_label: str | None = "airflow.apache.org/connection-name", + variables_label: str | None = "airflow.apache.org/variable-name", + config_label: str | None = "airflow.apache.org/config-name", + connections_data_key: str = "value", + variables_data_key: str = "value", + config_data_key: str = "value", + **kwargs, + ): + super().__init__(**kwargs) + self._namespace = namespace + self.connections_label = connections_label + self.variables_label = variables_label + self.config_label = config_label + self.connections_data_key = connections_data_key + self.variables_data_key = variables_data_key + self.config_data_key = config_data_key + + @cached_property + def namespace(self) -> str: + """Return the configured namespace, or auto-detect from service account metadata.""" + if self._namespace: + return self._namespace + try: + return Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip() + except FileNotFoundError: + raise AirflowException( + "Could not auto-detect Kubernetes namespace from " + "/var/run/secrets/kubernetes.io/serviceaccount/namespace. " + "Is automountServiceAccountToken disabled for this pod? " + "Set the 'namespace' parameter explicitly in backend_kwargs." + ) Review Comment: I do not see a circular import here, I think it will be fine, as connections can be imported more than once, as importing connections does not import the hook or the backend (or the base backend) Correct me if I am wrong ########## providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py: ########## @@ -0,0 +1,403 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import base64 +import json +from unittest import mock + +import pytest +from kubernetes.client.exceptions import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend import ( + KubernetesSecretsBackend, +) + +MODULE_PATH = "airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend" + + +def _make_secret(data: dict[str, str], name: str = "some-secret"): + """Create a mock V1Secret with base64-encoded data.""" + encoded = {k: base64.b64encode(v.encode("utf-8")).decode("utf-8") for k, v in data.items()} + secret = mock.MagicMock() + secret.data = encoded + secret.metadata.name = name + return secret + + +def _make_secret_list(secrets: list): + """Create a mock V1SecretList with the given items.""" + secret_list = mock.MagicMock() + secret_list.items = secrets + return secret_list + + +class TestKubernetesSecretsBackendConnections: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_uri(self, mock_client, mock_namespace): + """Test reading a connection URI from a Kubernetes secret.""" + uri = "postgresql://user:pass@host:5432/db" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": uri})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == uri + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_json(self, mock_client, mock_namespace): + """Test reading a JSON-formatted connection from a Kubernetes secret.""" + conn_json = json.dumps( + { + "conn_type": "postgres", + "login": "user", + "password": "pass", + "host": "host", + "port": 5432, + "schema": "db", + } + ) + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": conn_json})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == conn_json + parsed = json.loads(result) + assert parsed["conn_type"] == "postgres" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_not_found(self, mock_client, mock_namespace): + """Test that a missing secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendVariables: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable(self, mock_client, mock_namespace): + """Test reading a variable from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "my-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("api_key") + + assert result == "my-value" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/variable-key=api_key", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable_not_found(self, mock_client, mock_namespace): + """Test that a missing variable secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config(self, mock_client, mock_namespace): + """Test reading a config value from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "sqlite:///airflow.db"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_config("sql_alchemy_conn") + + assert result == "sqlite:///airflow.db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/config-key=sql_alchemy_conn", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config_not_found(self, mock_client, mock_namespace): + """Test that a missing config secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_config("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendCustomConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_label(self, mock_client, mock_namespace): + """Test using a custom label key.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_label="my-org/conn") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="my-org/conn=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_data_key(self, mock_client, mock_namespace): + """Test using a custom data key for connections.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"conn_uri": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_data_key="conn_uri") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_missing_value_data_key_returns_none(self, mock_client, mock_namespace): + """Test that a secret without the 'value' data key returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"wrong_key": "some-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_secret_with_none_data_returns_none(self, mock_client, mock_namespace): + """Test that a secret with None data returns None.""" + secret = mock.MagicMock() + secret.data = None + secret.metadata.name = "some-secret" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([secret]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + +class TestKubernetesSecretsBackendTeamName: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_team_name_does_not_affect_conn_lookup(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "uri://val"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db", team_name="my-team") + + assert result == "uri://val" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_team_name_does_not_affect_variable_lookup(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "val"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("my_key", team_name="my-team") + + assert result == "val" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/variable-key=my_key", + resource_version="0", + ) + + +class TestKubernetesSecretsBackendLabelNone: + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_connections_label_none(self, mock_get_secret): + """Test that setting connections_label to None skips connection lookups.""" + backend = KubernetesSecretsBackend(connections_label=None) + result = backend.get_conn_value("my_db") + + assert result is None + mock_get_secret.assert_not_called() + + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_variables_label_none(self, mock_get_secret): + """Test that setting variables_label to None skips variable lookups.""" + backend = KubernetesSecretsBackend(variables_label=None) + result = backend.get_variable("my_var") + + assert result is None + mock_get_secret.assert_not_called() + + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_config_label_none(self, mock_get_secret): + """Test that setting config_label to None skips config lookups.""" + backend = KubernetesSecretsBackend(config_label=None) + result = backend.get_config("my_config") + + assert result is None + mock_get_secret.assert_not_called() + + +class TestKubernetesSecretsBackendMultipleMatches: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_multiple_secrets_uses_first_and_warns(self, mock_client, mock_namespace, caplog): + """Test that multiple matching secrets uses the first and logs a warning.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [ + _make_secret({"value": "first-value"}, name="secret-1"), + _make_secret({"value": "second-value"}, name="secret-2"), + ] + ) + + backend = KubernetesSecretsBackend() + import logging + + with caplog.at_level(logging.WARNING): + result = backend.get_conn_value("my_db") + + assert result == "first-value" + assert "Multiple secrets found" in caplog.text + + +class TestKubernetesSecretsBackendResourceVersion: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_resource_version_zero_is_passed(self, mock_client, mock_namespace): + """Test that resource_version='0' is passed for cached API server reads.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "val"})] + ) + + backend = KubernetesSecretsBackend() + backend.get_conn_value("my_db") + + call_kwargs = mock_client.return_value.list_namespaced_secret.call_args + assert call_kwargs.kwargs["resource_version"] == "0" + + +class TestKubernetesSecretsBackendClientInit: Review Comment: If it is possible to utilize the hook, maybe this test can be removed as well ########## providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py: ########## @@ -0,0 +1,403 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import base64 +import json +from unittest import mock + +import pytest +from kubernetes.client.exceptions import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend import ( + KubernetesSecretsBackend, +) + +MODULE_PATH = "airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend" + + +def _make_secret(data: dict[str, str], name: str = "some-secret"): + """Create a mock V1Secret with base64-encoded data.""" + encoded = {k: base64.b64encode(v.encode("utf-8")).decode("utf-8") for k, v in data.items()} + secret = mock.MagicMock() + secret.data = encoded + secret.metadata.name = name + return secret + + +def _make_secret_list(secrets: list): + """Create a mock V1SecretList with the given items.""" + secret_list = mock.MagicMock() + secret_list.items = secrets + return secret_list + + +class TestKubernetesSecretsBackendConnections: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_uri(self, mock_client, mock_namespace): + """Test reading a connection URI from a Kubernetes secret.""" + uri = "postgresql://user:pass@host:5432/db" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": uri})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == uri + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_json(self, mock_client, mock_namespace): + """Test reading a JSON-formatted connection from a Kubernetes secret.""" + conn_json = json.dumps( + { + "conn_type": "postgres", + "login": "user", + "password": "pass", + "host": "host", + "port": 5432, + "schema": "db", + } + ) + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": conn_json})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result == conn_json + parsed = json.loads(result) + assert parsed["conn_type"] == "postgres" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_conn_value_not_found(self, mock_client, mock_namespace): + """Test that a missing secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendVariables: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable(self, mock_client, mock_namespace): + """Test reading a variable from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "my-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("api_key") + + assert result == "my-value" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/variable-key=api_key", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_variable_not_found(self, mock_client, mock_namespace): + """Test that a missing variable secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config(self, mock_client, mock_namespace): + """Test reading a config value from a Kubernetes secret.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "sqlite:///airflow.db"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_config("sql_alchemy_conn") + + assert result == "sqlite:///airflow.db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/config-key=sql_alchemy_conn", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_get_config_not_found(self, mock_client, mock_namespace): + """Test that a missing config secret returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([]) + + backend = KubernetesSecretsBackend() + result = backend.get_config("nonexistent") + + assert result is None + + +class TestKubernetesSecretsBackendCustomConfig: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_label(self, mock_client, mock_namespace): + """Test using a custom label key.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_label="my-org/conn") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="my-org/conn=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_custom_data_key(self, mock_client, mock_namespace): + """Test using a custom data key for connections.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"conn_uri": "postgresql://localhost/db"})] + ) + + backend = KubernetesSecretsBackend(connections_data_key="conn_uri") + result = backend.get_conn_value("my_db") + + assert result == "postgresql://localhost/db" + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_missing_value_data_key_returns_none(self, mock_client, mock_namespace): + """Test that a secret without the 'value' data key returns None.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"wrong_key": "some-value"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_secret_with_none_data_returns_none(self, mock_client, mock_namespace): + """Test that a secret with None data returns None.""" + secret = mock.MagicMock() + secret.data = None + secret.metadata.name = "some-secret" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list([secret]) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db") + + assert result is None + + +class TestKubernetesSecretsBackendTeamName: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_team_name_does_not_affect_conn_lookup(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "uri://val"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_conn_value("my_db", team_name="my-team") + + assert result == "uri://val" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/connection-id=my_db", + resource_version="0", + ) + + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_team_name_does_not_affect_variable_lookup(self, mock_client, mock_namespace): + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [_make_secret({"value": "val"})] + ) + + backend = KubernetesSecretsBackend() + result = backend.get_variable("my_key", team_name="my-team") + + assert result == "val" + mock_client.return_value.list_namespaced_secret.assert_called_once_with( + "default", + label_selector="airflow.apache.org/variable-key=my_key", + resource_version="0", + ) + + +class TestKubernetesSecretsBackendLabelNone: + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_connections_label_none(self, mock_get_secret): + """Test that setting connections_label to None skips connection lookups.""" + backend = KubernetesSecretsBackend(connections_label=None) + result = backend.get_conn_value("my_db") + + assert result is None + mock_get_secret.assert_not_called() + + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_variables_label_none(self, mock_get_secret): + """Test that setting variables_label to None skips variable lookups.""" + backend = KubernetesSecretsBackend(variables_label=None) + result = backend.get_variable("my_var") + + assert result is None + mock_get_secret.assert_not_called() + + @mock.patch(f"{MODULE_PATH}._get_secret") + def test_config_label_none(self, mock_get_secret): + """Test that setting config_label to None skips config lookups.""" + backend = KubernetesSecretsBackend(config_label=None) + result = backend.get_config("my_config") + + assert result is None + mock_get_secret.assert_not_called() + + +class TestKubernetesSecretsBackendMultipleMatches: + @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock, return_value="default") + @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock) + def test_multiple_secrets_uses_first_and_warns(self, mock_client, mock_namespace, caplog): + """Test that multiple matching secrets uses the first and logs a warning.""" + mock_client.return_value.list_namespaced_secret.return_value = _make_secret_list( + [ + _make_secret({"value": "first-value"}, name="secret-1"), + _make_secret({"value": "second-value"}, name="secret-2"), + ] + ) + + backend = KubernetesSecretsBackend() + import logging + + with caplog.at_level(logging.WARNING): + result = backend.get_conn_value("my_db") + + assert result == "first-value" + assert "Multiple secrets found" in caplog.text + + +class TestKubernetesSecretsBackendResourceVersion: Review Comment: I am not sure this test is needed, as it is checked in multiple places -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
