This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 58d894d071a Add Kubernetes Secrets Backend to cncf.kubernetes provider
(#61527)
58d894d071a is described below
commit 58d894d071a57af2978d222b631a676571cbf65f
Author: Piotr Klinski <[email protected]>
AuthorDate: Sun Feb 22 11:21:27 2026 +0100
Add Kubernetes Secrets Backend to cncf.kubernetes provider (#61527)
* Add Kubernetes Secrets Backend to cncf.kubernetes provider
Add a new secrets backend that reads Airflow connections, variables,
and configurations from Kubernetes Secrets. This enables integration
with External Secrets Operator (ESO) or any tool that creates
Kubernetes secrets with a predictable naming scheme.
Key design decisions:
- Uses kubernetes.config.load_incluster_config() directly instead of
KubernetesHook to avoid circular dependencies (the secrets backend
cannot depend on Airflow connections since it IS the mechanism for
resolving them).
- Auto-detects namespace from pod service account metadata with
fallback to 'default'.
- Sanitizes secret names for Kubernetes DNS compatibility by
converting underscores to hyphens and lowercasing.
- Supports configurable prefixes and data keys for connections,
variables, and configurations.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* label based approch
* Update docs to reflect configurable namespace parameter
- Fix docstring to reference automountServiceAccountToken instead of
"not running inside a Kubernetes pod" (matching error message)
- Update RST prerequisites to mention "target namespace" instead of
assuming same namespace as Airflow pod
- Add namespace as first parameter in backend_kwargs documentation
- Rewrite authentication section to explain namespace override option
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Elevate log level from debug to warning for missing secrets
When a secret or data key is not found during label-based lookup,
a debug message is easy to miss. Upgrading to warning ensures
operators are promptly notified of misconfigured or missing secrets.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Extract label defaults and namespace path to class-level constants
Move hard-coded label keys and service account namespace path to
class constants (DEFAULT_CONNECTIONS_LABEL, DEFAULT_VARIABLES_LABEL,
DEFAULT_CONFIG_LABEL, SERVICE_ACCOUNT_NAMESPACE_PATH) for better
discoverability and a single source of truth. Rename _get_secret_by_label
to _get_secret, fix label values to use standard Airflow conventions
(connection-id, variable-key, config-key), and fix formatting issues.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Remove redundant tests from KubernetesSecretsBackend (#61527)
Address PR review feedback by removing duplicate tests:
- Remove TestKubernetesSecretsBackendTeamName (team_name is ignored,
already covered by existing connection/variable tests)
- Remove TestKubernetesSecretsBackendResourceVersion (resource_version="0"
is already verified in 4+ other tests via assert_called_once_with)
Also document that multi-team isolation is not currently supported in
get_conn_value and get_variable docstrings.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Move None label guard into _get_secret to reduce duplication (#61527)
The `if label is None: return None` check was repeated in get_conn_value,
get_variable, and get_config. Since _get_secret already receives the label
as a parameter, it is the natural place for this guard. This simplifies
the public methods to single-line delegations.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Update
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py
Co-authored-by: Jens Scheffler <[email protected]>
* Fix static checks via prek rnu -a update-providers-build-files
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
---
providers/cncf/kubernetes/docs/index.rst | 1 +
.../kubernetes-secrets-backend.rst | 253 +++++++++++++++
providers/cncf/kubernetes/provider.yaml | 3 +
.../providers/cncf/kubernetes/get_provider_info.py | 3 +
.../providers/cncf/kubernetes/secrets/__init__.py | 16 +
.../secrets/kubernetes_secrets_backend.py | 218 +++++++++++++
.../tests/unit/cncf/kubernetes/secrets/__init__.py | 16 +
.../secrets/test_kubernetes_secrets_backend.py | 351 +++++++++++++++++++++
8 files changed, 861 insertions(+)
diff --git a/providers/cncf/kubernetes/docs/index.rst
b/providers/cncf/kubernetes/docs/index.rst
index 7f0600942e3..29506bcda1b 100644
--- a/providers/cncf/kubernetes/docs/index.rst
+++ b/providers/cncf/kubernetes/docs/index.rst
@@ -44,6 +44,7 @@
Connection types <connections/kubernetes>
Operators <operators>
+ Secrets backends <secrets-backends/kubernetes-secrets-backend>
.. toctree::
:hidden:
diff --git
a/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst
b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst
new file mode 100644
index 00000000000..4f68de3128d
--- /dev/null
+++
b/providers/cncf/kubernetes/docs/secrets-backends/kubernetes-secrets-backend.rst
@@ -0,0 +1,253 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _kubernetes_secrets_backend:
+
+Kubernetes Secrets Backend
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+This topic describes how to configure Airflow to use
+`Kubernetes Secrets
<https://kubernetes.io/docs/concepts/configuration/secret/>`__
+as a secrets backend for retrieving connections, variables, and configuration.
+
+This backend discovers secrets using Kubernetes labels, so the secret name
does not matter.
+This makes it a natural fit when Airflow is running on Kubernetes and
integrates well
+with tools like `External Secrets Operator (ESO)
<https://external-secrets.io/>`__,
+`Sealed Secrets <https://sealed-secrets.netlify.app/>`__, or any tool that
creates
+Kubernetes secrets -- regardless of naming conventions.
+
+Before you begin
+""""""""""""""""
+
+Before you start, make sure you have performed the following tasks:
+
+1. Include the ``cncf.kubernetes`` provider as part of your Airflow
installation:
+
+ .. code-block:: bash
+
+ pip install apache-airflow-providers-cncf-kubernetes
+
+2. Ensure Airflow is running inside a Kubernetes cluster (in-cluster mode).
+
+3. Ensure the pod's service account has permission to list secrets in the
target namespace (by default, the namespace where Airflow runs).
+
+Enabling the secret backend
+"""""""""""""""""""""""""""
+
+To enable the Kubernetes secrets backend, specify
+:py:class:`~airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend`
+as the ``backend`` in the ``[secrets]`` section of ``airflow.cfg``.
+
+Here is a sample configuration:
+
+.. code-block:: ini
+
+ [secrets]
+ backend =
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+
+You can also set this with environment variables:
+
+.. code-block:: bash
+
+ export
AIRFLOW__SECRETS__BACKEND=airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+
+You can verify the correct setting of the configuration options with the
``airflow config get-value`` command:
+
+.. code-block:: console
+
+ $ airflow config get-value secrets backend
+
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+
+Backend parameters
+""""""""""""""""""
+
+The following parameters can be passed via ``backend_kwargs`` as a JSON
dictionary:
+
+* ``namespace``: Kubernetes namespace to query for secrets. If not set,
auto-detected from the pod's service account. Default: auto-detect
+* ``connections_label``: Label key used to discover connection secrets.
Default: ``"airflow.apache.org/connection-id"``
+* ``variables_label``: Label key used to discover variable secrets. Default:
``"airflow.apache.org/variable-key"``
+* ``config_label``: Label key used to discover config secrets. Default:
``"airflow.apache.org/config-key"``
+* ``connections_data_key``: The data key in the Kubernetes secret that holds
the connection value. Default: ``"value"``
+* ``variables_data_key``: The data key in the Kubernetes secret that holds the
variable value. Default: ``"value"``
+* ``config_data_key``: The data key in the Kubernetes secret that holds the
config value. Default: ``"value"``
+
+For example, if you want to use custom label keys:
+
+.. code-block:: ini
+
+ [secrets]
+ backend =
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+ backend_kwargs = {"connections_label": "my-org.io/connection",
"variables_label": "my-org.io/variable"}
+
+Authentication
+""""""""""""""
+
+The backend uses in-cluster Kubernetes authentication directly via
+``kubernetes.config.load_incluster_config()``. By default, the namespace is
auto-detected
+from the pod's service account metadata
+(``/var/run/secrets/kubernetes.io/serviceaccount/namespace``). You can
override this by
+setting the ``namespace`` parameter in ``backend_kwargs`` to query secrets
from a different
+namespace. No additional authentication configuration is required.
+
+The backend does **not** use an Airflow connection or KubernetesHook, since
the secrets backend
+itself is used to resolve connections (using a connection would create a
circular dependency).
+
+Optional lookup
+"""""""""""""""
+
+Optionally connections, variables, or config may be looked up exclusive of
each other or in any combination.
+This will prevent requests being sent to the Kubernetes API for the excluded
type.
+
+If you want to look up some and not others, set the relevant ``*_label``
parameter to ``null``.
+
+For example, if you only want to look up connections and not variables or
config:
+
+.. code-block:: ini
+
+ [secrets]
+ backend =
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+ backend_kwargs = {"variables_label": null, "config_label": null}
+
+Performance
+"""""""""""
+
+The backend queries the Kubernetes API with ``resource_version="0"``, which
tells the API server to
+serve results from its in-memory watch cache. This makes lookups very fast
without requiring any
+Airflow-side caching.
+
+If multiple secrets match the same label, the backend will use the first one
and log a warning.
+
+Storing and Retrieving Connections
+""""""""""""""""""""""""""""""""""
+
+To store a connection, create a Kubernetes secret with a label whose key
matches ``connections_label``
+(default: ``airflow.apache.org/connection-id``) and whose value is the
connection id.
+The actual secret value goes in the ``value`` data key (or whatever
``connections_data_key`` is set to).
+
+The value should be the
+:ref:`connection URI representation <generating_connection_uri>` or the
+:ref:`JSON format <connection-serialization-json-example>` of the connection
object.
+
+Example secret YAML for a connection named ``smtp_default``:
+
+.. code-block:: yaml
+
+ apiVersion: v1
+ kind: Secret
+ metadata:
+ name: my-smtp-secret # name can be anything
+ labels:
+ airflow.apache.org/connection-id: smtp_default
+ data:
+ value: <base64-encoded-connection-uri>
+
+You can create a connection secret with ``kubectl``:
+
+.. code-block:: bash
+
+ kubectl create secret generic my-smtp-secret \
+ --from-literal=value='smtp://user:[email protected]:587' \
+ --namespace=airflow
+ kubectl label secret my-smtp-secret \
+ airflow.apache.org/connection-id=smtp_default \
+ --namespace=airflow
+
+Or using a JSON connection format:
+
+.. code-block:: bash
+
+ kubectl create secret generic my-postgres-secret \
+ --from-literal=value='{"conn_type": "postgres", "host":
"db.example.com", "login": "user", "password": "pass", "port": 5432, "schema":
"mydb"}' \
+ --namespace=airflow
+ kubectl label secret my-postgres-secret \
+ airflow.apache.org/connection-id=my_postgres_db \
+ --namespace=airflow
+
+Storing and Retrieving Variables
+""""""""""""""""""""""""""""""""
+
+To store a variable, create a Kubernetes secret with a label whose key matches
``variables_label``
+(default: ``airflow.apache.org/variable-key``) and whose value is the variable
key.
+
+Example secret YAML for a variable named ``my_var``:
+
+.. code-block:: yaml
+
+ apiVersion: v1
+ kind: Secret
+ metadata:
+ name: my-var-secret # name can be anything
+ labels:
+ airflow.apache.org/variable-key: my_var
+ data:
+ value: <base64-encoded-variable-value>
+
+You can create a variable secret with ``kubectl``:
+
+.. code-block:: bash
+
+ kubectl create secret generic my-var-secret \
+ --from-literal=value='my_secret_value' \
+ --namespace=airflow
+ kubectl label secret my-var-secret \
+ airflow.apache.org/variable-key=my_var \
+ --namespace=airflow
+
+Using with External Secrets Operator
+"""""""""""""""""""""""""""""""""""""
+
+The `External Secrets Operator (ESO) <https://external-secrets.io/>`__ can
synchronize secrets from
+external stores (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager,
Azure Key Vault, etc.)
+into Kubernetes secrets. This backend works seamlessly with ESO -- simply
configure ESO to add the
+appropriate Airflow label to the generated Kubernetes secret, and Airflow will
discover it
+automatically. The secret name does not matter, only the label.
+
+For example, an ESO ``ExternalSecret`` resource can use ``metadata.labels`` in
its target template
+to set ``airflow.apache.org/connection-id: <conn-id>``.
+
+This pattern allows you to use a single secrets backend configuration in
Airflow while managing
+the actual secret values in your preferred external secret store.
+
+Checking configuration
+""""""""""""""""""""""
+
+You can use the ``airflow connections get`` command to check if the connection
is correctly read from
+the backend secret:
+
+.. code-block:: console
+
+ $ airflow connections get smtp_default
+ Id: null
+ Connection Id: smtp_default
+ Connection Type: smtp
+ Host: smtp.example.com
+ Schema: ''
+ Login: user
+ Password: password
+ Port: 587
+ Is Encrypted: null
+ Is Extra Encrypted: null
+ Extra: {}
+ URI: smtp://user:[email protected]:587
+
+To check that variables are correctly read from the backend secret, you can use
+``airflow variables get``:
+
+.. code-block:: console
+
+ $ airflow variables get my_var
+ my_secret_value
diff --git a/providers/cncf/kubernetes/provider.yaml
b/providers/cncf/kubernetes/provider.yaml
index 0b70beb9090..20faf60ad51 100644
--- a/providers/cncf/kubernetes/provider.yaml
+++ b/providers/cncf/kubernetes/provider.yaml
@@ -161,6 +161,9 @@ triggers:
- airflow.providers.cncf.kubernetes.triggers.job
+secrets-backends:
+ -
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+
connection-types:
- hook-class-name:
airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook
connection-type: kubernetes
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
index 36db629f38b..9e4d433827e 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py
@@ -75,6 +75,9 @@ def get_provider_info():
],
}
],
+ "secrets-backends": [
+
"airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend"
+ ],
"connection-types": [
{
"hook-class-name":
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook",
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/__init__.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py
new file mode 100644
index 00000000000..ea67f7c3da6
--- /dev/null
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py
@@ -0,0 +1,218 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.config import load_incluster_config
+
+from airflow.exceptions import AirflowException
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+ """
+ Retrieve connections, variables, and configs from Kubernetes Secrets using
labels.
+
+ This backend discovers secrets by querying Kubernetes labels, enabling
integration
+ with External Secrets Operator (ESO), Sealed Secrets, or any tool that
creates
+ Kubernetes secrets — regardless of the secret's name.
+
+ Configurable via ``airflow.cfg``:
+
+ .. code-block:: ini
+
+ [secrets]
+ backend =
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+ backend_kwargs = {"namespace": "airflow", "connections_label":
"airflow.apache.org/connection-id"}
+
+ The secret must have a label whose key matches the configured label and
whose value
+ matches the requested identifier (conn_id, variable key, or config key).
The actual
+ secret value is read from the ``value`` key in the secret's data.
+
+ Example Kubernetes secret for a connection named ``my_db``:
+
+ .. code-block:: yaml
+
+ apiVersion: v1
+ kind: Secret
+ metadata:
+ name: anything
+ labels:
+ airflow.apache.org/connection-id: my_db
+ data:
+ value: <base64-encoded-connection-uri>
+
+ **Authentication:** Uses ``kubernetes.config.load_incluster_config()``
directly
+ for in-cluster authentication. Does not use KubernetesHook or any Airflow
connection,
+ avoiding circular dependencies since this IS the secrets backend.
+ The namespace can be set explicitly via ``backend_kwargs``. If not set, it
is
+ auto-detected from the pod's service account metadata at
+ ``/var/run/secrets/kubernetes.io/serviceaccount/namespace``. If
auto-detection
+ fails (e.g. ``automountServiceAccountToken`` is disabled), an error is
raised.
+
+ **Performance:** Queries use ``resource_version="0"`` so the Kubernetes
API server
+ serves results from its in-memory watch cache, making lookups very fast
without
+ requiring Airflow-side caching.
+
+ :param namespace: Kubernetes namespace to query for secrets. If not set,
the
+ namespace is auto-detected from the pod's service account metadata. If
+ auto-detection fails, an ``AirflowException`` is raised.
+ :param connections_label: Label key used to discover connection secrets.
+ If set to None, requests for connections will not be sent to
Kubernetes.
+ :param variables_label: Label key used to discover variable secrets.
+ If set to None, requests for variables will not be sent to Kubernetes.
+ :param config_label: Label key used to discover config secrets.
+ If set to None, requests for configurations will not be sent to
Kubernetes.
+ :param connections_data_key: The data key in the Kubernetes secret that
holds the
+ connection value. Default: ``"value"``
+ :param variables_data_key: The data key in the Kubernetes secret that
holds the
+ variable value. Default: ``"value"``
+ :param config_data_key: The data key in the Kubernetes secret that holds
the
+ config value. Default: ``"value"``
+ """
+
+ DEFAULT_CONNECTIONS_LABEL = "airflow.apache.org/connection-id"
+ DEFAULT_VARIABLES_LABEL = "airflow.apache.org/variable-key"
+ DEFAULT_CONFIG_LABEL = "airflow.apache.org/config-key"
+ SERVICE_ACCOUNT_NAMESPACE_PATH =
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+
+ def __init__(
+ self,
+ namespace: str | None = None,
+ connections_label: str = DEFAULT_CONNECTIONS_LABEL,
+ variables_label: str = DEFAULT_VARIABLES_LABEL,
+ config_label: str = DEFAULT_CONFIG_LABEL,
+ connections_data_key: str = "value",
+ variables_data_key: str = "value",
+ config_data_key: str = "value",
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self._namespace = namespace
+ self.connections_label = connections_label
+ self.variables_label = variables_label
+ self.config_label = config_label
+ self.connections_data_key = connections_data_key
+ self.variables_data_key = variables_data_key
+ self.config_data_key = config_data_key
+
+ @cached_property
+ def namespace(self) -> str:
+ """Return the configured namespace, or auto-detect from service
account metadata."""
+ if self._namespace:
+ return self._namespace
+ try:
+ return
Path(self.SERVICE_ACCOUNT_NAMESPACE_PATH).read_text().strip()
+ except FileNotFoundError:
+ raise AirflowException(
+ f"Could not auto-detect Kubernetes namespace from "
+ f"{self.SERVICE_ACCOUNT_NAMESPACE_PATH}. "
+ f"Is automountServiceAccountToken disabled for this pod? "
+ f"Set the 'namespace' parameter explicitly in backend_kwargs."
+ )
+
+ @cached_property
+ def client(self) -> CoreV1Api:
+ """Lazy-init Kubernetes CoreV1Api client using in-cluster config
directly."""
+ load_incluster_config()
+ return CoreV1Api(ApiClient())
+
+ def get_conn_value(self, conn_id: str, team_name: str | None = None) ->
str | None:
+ """
+ Get serialized representation of Connection from a Kubernetes secret.
+
+ Multi-team isolation is not currently supported; ``team_name`` is
accepted
+ for API compatibility but ignored.
+
+ :param conn_id: connection id
+ :param team_name: Team name (unused — multi-team is not currently
supported)
+ """
+ return self._get_secret(self.connections_label, conn_id,
self.connections_data_key)
+
+ def get_variable(self, key: str, team_name: str | None = None) -> str |
None:
+ """
+ Get Airflow Variable from a Kubernetes secret.
+
+ Multi-team isolation is not currently supported; ``team_name`` is
accepted
+ for API compatibility but ignored.
+
+ :param key: Variable Key
+ :param team_name: Team name (unused — multi-team is not currently
supported)
+ :return: Variable Value
+ """
+ return self._get_secret(self.variables_label, key,
self.variables_data_key)
+
+ def get_config(self, key: str) -> str | None:
+ """
+ Get Airflow Configuration from a Kubernetes secret.
+
+ :param key: Configuration Option Key
+ :return: Configuration Option Value
+ """
+ return self._get_secret(self.config_label, key, self.config_data_key)
+
+ def _get_secret(self, label_key: str | None, label_value: str, data_key:
str) -> str | None:
+ """
+ Get secret value from Kubernetes by label selector.
+
+ Queries for secrets with a label ``{label_key}={label_value}`` using
+ ``resource_version="0"`` for fast cached reads from the API server.
+
+ :param label_key: The label key to search for. If None, returns None
immediately
+ (used to skip lookups when a label is not configured).
+ :param label_value: The label value to match (e.g. conn_id or variable
key)
+ :param data_key: The key within the secret's data dict to read
+ :return: Secret value or None if not found
+ """
+ if label_key is None:
+ return None
+ label_selector = f"{label_key}={label_value}"
+ secret_list = self.client.list_namespaced_secret(
+ self.namespace,
+ label_selector=label_selector,
+ resource_version="0",
+ )
+ if not secret_list.items:
+ self.log.warning(
+ "No secret found with label %s in namespace %s.",
+ label_selector,
+ self.namespace,
+ )
+ return None
+ if len(secret_list.items) > 1:
+ self.log.warning(
+ "Multiple secrets found with label %s in namespace %s. Using
the first one.",
+ label_selector,
+ self.namespace,
+ )
+ secret = secret_list.items[0]
+ if secret.data is None or data_key not in secret.data:
+ self.log.warning(
+ "Secret '%s' does not have data key '%s'.",
+ secret.metadata.name,
+ data_key,
+ )
+ return None
+ return base64.b64decode(secret.data[data_key]).decode("utf-8")
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/__init__.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py
new file mode 100644
index 00000000000..6dced3d340b
--- /dev/null
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/secrets/test_kubernetes_secrets_backend.py
@@ -0,0 +1,351 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import base64
+import json
+from unittest import mock
+
+import pytest
+from kubernetes.client.exceptions import ApiException
+
+from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend
import (
+ KubernetesSecretsBackend,
+)
+
+MODULE_PATH =
"airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend"
+
+
+def _make_secret(data: dict[str, str], name: str = "some-secret"):
+ """Create a mock V1Secret with base64-encoded data."""
+ encoded = {k: base64.b64encode(v.encode("utf-8")).decode("utf-8") for k, v
in data.items()}
+ secret = mock.MagicMock()
+ secret.data = encoded
+ secret.metadata.name = name
+ return secret
+
+
+def _make_secret_list(secrets: list):
+ """Create a mock V1SecretList with the given items."""
+ secret_list = mock.MagicMock()
+ secret_list.items = secrets
+ return secret_list
+
+
+class TestKubernetesSecretsBackendConnections:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_conn_value_uri(self, mock_client, mock_namespace):
+ """Test reading a connection URI from a Kubernetes secret."""
+ uri = "postgresql://user:pass@host:5432/db"
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": uri})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_conn_value("my_db")
+
+ assert result == uri
+
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
+ "default",
+ label_selector="airflow.apache.org/connection-id=my_db",
+ resource_version="0",
+ )
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_conn_value_json(self, mock_client, mock_namespace):
+ """Test reading a JSON-formatted connection from a Kubernetes
secret."""
+ conn_json = json.dumps(
+ {
+ "conn_type": "postgres",
+ "login": "user",
+ "password": "pass",
+ "host": "host",
+ "port": 5432,
+ "schema": "db",
+ }
+ )
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": conn_json})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_conn_value("my_db")
+
+ assert result == conn_json
+ parsed = json.loads(result)
+ assert parsed["conn_type"] == "postgres"
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_conn_value_not_found(self, mock_client, mock_namespace):
+ """Test that a missing secret returns None."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list([])
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_conn_value("nonexistent")
+
+ assert result is None
+
+
+class TestKubernetesSecretsBackendVariables:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_variable(self, mock_client, mock_namespace):
+ """Test reading a variable from a Kubernetes secret."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": "my-value"})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_variable("api_key")
+
+ assert result == "my-value"
+
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
+ "default",
+ label_selector="airflow.apache.org/variable-key=api_key",
+ resource_version="0",
+ )
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_variable_not_found(self, mock_client, mock_namespace):
+ """Test that a missing variable secret returns None."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list([])
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_variable("nonexistent")
+
+ assert result is None
+
+
+class TestKubernetesSecretsBackendConfig:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_config(self, mock_client, mock_namespace):
+ """Test reading a config value from a Kubernetes secret."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": "sqlite:///airflow.db"})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_config("sql_alchemy_conn")
+
+ assert result == "sqlite:///airflow.db"
+
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
+ "default",
+ label_selector="airflow.apache.org/config-key=sql_alchemy_conn",
+ resource_version="0",
+ )
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_get_config_not_found(self, mock_client, mock_namespace):
+ """Test that a missing config secret returns None."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list([])
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_config("nonexistent")
+
+ assert result is None
+
+
+class TestKubernetesSecretsBackendCustomConfig:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_custom_label(self, mock_client, mock_namespace):
+ """Test using a custom label key."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": "postgresql://localhost/db"})]
+ )
+
+ backend = KubernetesSecretsBackend(connections_label="my-org/conn")
+ result = backend.get_conn_value("my_db")
+
+ assert result == "postgresql://localhost/db"
+
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
+ "default",
+ label_selector="my-org/conn=my_db",
+ resource_version="0",
+ )
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_custom_data_key(self, mock_client, mock_namespace):
+ """Test using a custom data key for connections."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"conn_uri": "postgresql://localhost/db"})]
+ )
+
+ backend = KubernetesSecretsBackend(connections_data_key="conn_uri")
+ result = backend.get_conn_value("my_db")
+
+ assert result == "postgresql://localhost/db"
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_missing_value_data_key_returns_none(self, mock_client,
mock_namespace):
+ """Test that a secret without the 'value' data key returns None."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"wrong_key": "some-value"})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_conn_value("my_db")
+
+ assert result is None
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_secret_with_none_data_returns_none(self, mock_client,
mock_namespace):
+ """Test that a secret with None data returns None."""
+ secret = mock.MagicMock()
+ secret.data = None
+ secret.metadata.name = "some-secret"
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list([secret])
+
+ backend = KubernetesSecretsBackend()
+ result = backend.get_conn_value("my_db")
+
+ assert result is None
+
+
+class TestKubernetesSecretsBackendLabelNone:
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_connections_label_none(self, mock_client):
+ """Test that setting connections_label to None skips connection
lookups."""
+ backend = KubernetesSecretsBackend(connections_label=None)
+ result = backend.get_conn_value("my_db")
+
+ assert result is None
+ mock_client.return_value.list_namespaced_secret.assert_not_called()
+
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_variables_label_none(self, mock_client):
+ """Test that setting variables_label to None skips variable lookups."""
+ backend = KubernetesSecretsBackend(variables_label=None)
+ result = backend.get_variable("my_var")
+
+ assert result is None
+ mock_client.return_value.list_namespaced_secret.assert_not_called()
+
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_config_label_none(self, mock_client):
+ """Test that setting config_label to None skips config lookups."""
+ backend = KubernetesSecretsBackend(config_label=None)
+ result = backend.get_config("my_config")
+
+ assert result is None
+ mock_client.return_value.list_namespaced_secret.assert_not_called()
+
+
+class TestKubernetesSecretsBackendMultipleMatches:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_multiple_secrets_uses_first_and_warns(self, mock_client,
mock_namespace, caplog):
+ """Test that multiple matching secrets uses the first and logs a
warning."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [
+ _make_secret({"value": "first-value"}, name="secret-1"),
+ _make_secret({"value": "second-value"}, name="secret-2"),
+ ]
+ )
+
+ backend = KubernetesSecretsBackend()
+ import logging
+
+ with caplog.at_level(logging.WARNING):
+ result = backend.get_conn_value("my_db")
+
+ assert result == "first-value"
+ assert "Multiple secrets found" in caplog.text
+
+
+class TestKubernetesSecretsBackendClientInit:
+
@mock.patch("airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.CoreV1Api")
+
@mock.patch("airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.ApiClient")
+
@mock.patch("airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.load_incluster_config")
+ def test_client_uses_incluster_config(self, mock_load_incluster,
mock_api_client, mock_core_v1):
+ """Test that the client is obtained via load_incluster_config
directly."""
+ backend = KubernetesSecretsBackend()
+ result = backend.client
+
+ mock_load_incluster.assert_called_once()
+ mock_api_client.assert_called_once()
+ mock_core_v1.assert_called_once_with(mock_api_client.return_value)
+ assert result is mock_core_v1.return_value
+
+
+class TestKubernetesSecretsBackendNamespace:
+
@mock.patch("airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.Path")
+ def test_namespace_auto_detected(self, mock_path_cls):
+ """Test that namespace is auto-detected from the service account
metadata."""
+ mock_path_cls.return_value.read_text.return_value = "airflow\n"
+
+ backend = KubernetesSecretsBackend()
+ assert backend.namespace == "airflow"
+
+
mock_path_cls.assert_called_once_with(KubernetesSecretsBackend.SERVICE_ACCOUNT_NAMESPACE_PATH)
+
+
@mock.patch("airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.Path")
+ def test_namespace_raises_when_not_found(self, mock_path_cls):
+ """Test that namespace raises AirflowException when file is not
found."""
+ mock_path_cls.return_value.read_text.side_effect = FileNotFoundError
+
+ backend = KubernetesSecretsBackend()
+ with pytest.raises(
+ AirflowException, match="Could not auto-detect Kubernetes
namespace.*automountServiceAccountToken"
+ ):
+ _ = backend.namespace
+
+ def test_namespace_explicit(self):
+ """Test that an explicitly passed namespace is used without reading
the file."""
+ backend = KubernetesSecretsBackend(namespace="my-ns")
+ assert backend.namespace == "my-ns"
+
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="airflow")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_namespace_used_in_api_calls(self, mock_client, mock_namespace):
+ """Test that auto-detected namespace is used when listing secrets."""
+ mock_client.return_value.list_namespaced_secret.return_value =
_make_secret_list(
+ [_make_secret({"value": "postgresql://localhost/db"})]
+ )
+
+ backend = KubernetesSecretsBackend()
+ backend.get_conn_value("my_db")
+
+
mock_client.return_value.list_namespaced_secret.assert_called_once_with(
+ "airflow",
+ label_selector="airflow.apache.org/connection-id=my_db",
+ resource_version="0",
+ )
+
+
+class TestKubernetesSecretsBackendApiErrors:
+ @mock.patch(f"{MODULE_PATH}.namespace", new_callable=mock.PropertyMock,
return_value="default")
+ @mock.patch(f"{MODULE_PATH}.client", new_callable=mock.PropertyMock)
+ def test_api_exception_is_raised(self, mock_client, mock_namespace):
+ """Test that API exceptions are re-raised."""
+ mock_client.return_value.list_namespaced_secret.side_effect =
ApiException(status=403)
+
+ backend = KubernetesSecretsBackend()
+
+ with pytest.raises(ApiException) as exc_info:
+ backend.get_conn_value("my_db")
+ assert exc_info.value.status == 403