SameerMesiah97 commented on code in PR #61527: URL: https://github.com/apache/airflow/pull/61527#discussion_r2779650684
########## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py: ########## @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing connections, variables, and configs from Kubernetes Secrets.""" + +from __future__ import annotations + +import base64 +from functools import cached_property +from pathlib import Path + +from kubernetes.client import ApiClient, CoreV1Api +from kubernetes.client.exceptions import ApiException +from kubernetes.config import load_incluster_config + +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieve connections, variables, and configs from Kubernetes Secrets. + + This backend reads secrets using a naming convention, enabling integration with + External Secrets Operator (ESO) or any tool that creates Kubernetes secrets with + a predictable naming scheme. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend + backend_kwargs = {"connections_prefix": "airflow-connections"} + + For example, if the Kubernetes secret name is ``airflow-connections-my-db`` with a data key + ``value`` containing a connection URI, this would be accessible if you provide + ``{"connections_prefix": "airflow-connections"}`` and request conn_id ``my_db``. + + The secret name is built as ``{prefix}-{key}`` where underscores in the key are + replaced with hyphens to conform to Kubernetes DNS naming requirements. + + **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` directly + for in-cluster authentication. Does not use KubernetesHook or any Airflow connection, + avoiding circular dependencies since this IS the secrets backend. + The namespace is auto-detected from the pod's service account metadata. + + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None, requests for connections will not be sent to Kubernetes. + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None, requests for variables will not be sent to Kubernetes. + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None, requests for configurations will not be sent to Kubernetes. + :param connections_data_key: The data key in the Kubernetes secret that holds the + connection value. Default: ``"value"`` + :param variables_data_key: The data key in the Kubernetes secret that holds the + variable value. Default: ``"value"`` + :param config_data_key: The data key in the Kubernetes secret that holds the + config value. Default: ``"value"`` + """ + + def __init__( + self, + connections_prefix: str | None = "airflow-connections", + variables_prefix: str | None = "airflow-variables", + config_prefix: str | None = "airflow-config", + connections_data_key: str = "value", + variables_data_key: str = "value", + config_data_key: str = "value", + **kwargs, + ): + super().__init__(**kwargs) + self.connections_prefix = connections_prefix + self.variables_prefix = variables_prefix + self.config_prefix = config_prefix + self.connections_data_key = connections_data_key + self.variables_data_key = variables_data_key + self.config_data_key = config_data_key + + @cached_property + def namespace(self) -> str: + """Auto-detect namespace from the pod's service account metadata, falling back to 'default'.""" + try: + return Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip() + except FileNotFoundError: + return "default" Review Comment: Nit: I would suggest adding this below `except FileNotFoundError`: `self.log.debug("Kubernetes namespace file not found; falling back to 'default' namespace")` This could be useful when investigating Kubernetes setup or configuration issues. ########## providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py: ########## @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing connections, variables, and configs from Kubernetes Secrets.""" + +from __future__ import annotations + +import base64 +from functools import cached_property +from pathlib import Path + +from kubernetes.client import ApiClient, CoreV1Api +from kubernetes.client.exceptions import ApiException +from kubernetes.config import load_incluster_config + +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin): + """ + Retrieve connections, variables, and configs from Kubernetes Secrets. + + This backend reads secrets using a naming convention, enabling integration with + External Secrets Operator (ESO) or any tool that creates Kubernetes secrets with + a predictable naming scheme. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend + backend_kwargs = {"connections_prefix": "airflow-connections"} + + For example, if the Kubernetes secret name is ``airflow-connections-my-db`` with a data key + ``value`` containing a connection URI, this would be accessible if you provide + ``{"connections_prefix": "airflow-connections"}`` and request conn_id ``my_db``. + + The secret name is built as ``{prefix}-{key}`` where underscores in the key are + replaced with hyphens to conform to Kubernetes DNS naming requirements. + + **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` directly + for in-cluster authentication. Does not use KubernetesHook or any Airflow connection, + avoiding circular dependencies since this IS the secrets backend. + The namespace is auto-detected from the pod's service account metadata. + + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None, requests for connections will not be sent to Kubernetes. + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None, requests for variables will not be sent to Kubernetes. + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None, requests for configurations will not be sent to Kubernetes. + :param connections_data_key: The data key in the Kubernetes secret that holds the + connection value. Default: ``"value"`` + :param variables_data_key: The data key in the Kubernetes secret that holds the + variable value. Default: ``"value"`` + :param config_data_key: The data key in the Kubernetes secret that holds the + config value. Default: ``"value"`` + """ + + def __init__( + self, + connections_prefix: str | None = "airflow-connections", + variables_prefix: str | None = "airflow-variables", + config_prefix: str | None = "airflow-config", + connections_data_key: str = "value", + variables_data_key: str = "value", + config_data_key: str = "value", + **kwargs, + ): + super().__init__(**kwargs) + self.connections_prefix = connections_prefix + self.variables_prefix = variables_prefix + self.config_prefix = config_prefix + self.connections_data_key = connections_data_key + self.variables_data_key = variables_data_key + self.config_data_key = config_data_key + + @cached_property + def namespace(self) -> str: + """Auto-detect namespace from the pod's service account metadata, falling back to 'default'.""" + try: + return Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip() + except FileNotFoundError: + return "default" + + @cached_property + def client(self) -> CoreV1Api: + """Lazy-init Kubernetes CoreV1Api client using in-cluster config directly.""" + load_incluster_config() + return CoreV1Api(ApiClient()) + + def get_conn_value(self, conn_id: str, team_name: str | None = None) -> str | None: + """ + Get serialized representation of Connection from a Kubernetes secret. + + :param conn_id: connection id + :param team_name: Team name associated to the task trying to access the connection (if any) + """ + if self.connections_prefix is None: + return None + return self._get_secret(self.connections_prefix, conn_id, self.connections_data_key) + + def get_variable(self, key: str, team_name: str | None = None) -> str | None: + """ + Get Airflow Variable from a Kubernetes secret. + + :param key: Variable Key + :param team_name: Team name associated to the task trying to access the variable (if any) + :return: Variable Value + """ + if self.variables_prefix is None: + return None + return self._get_secret(self.variables_prefix, key, self.variables_data_key) + + def get_config(self, key: str) -> str | None: + """ + Get Airflow Configuration from a Kubernetes secret. + + :param key: Configuration Option Key + :return: Configuration Option Value + """ + if self.config_prefix is None: + return None + return self._get_secret(self.config_prefix, key, self.config_data_key) + + def _get_secret(self, prefix: str, key: str, data_key: str) -> str | None: + """ + Get secret value from Kubernetes. + + Builds the secret name as ``{prefix}-{key}``, sanitizes it for K8s DNS + compatibility (underscores to hyphens), reads the secret, and returns the + base64-decoded value for the specified data key. + + :param prefix: Prefix for the secret name + :param key: Secret key (e.g. conn_id or variable key) + :param data_key: The key within the secret's data dict to read + :return: Secret value or None if not found + """ + secret_name = self.build_path(prefix, key, "-") + # Sanitize for Kubernetes DNS naming: underscores to hyphens, lowercase + secret_name = secret_name.replace("_", "-").lower() Review Comment: Are you sure this is doing a comprehensive sanitization of `secret_name` ? My understanding is that Secrets, like other namespaced Kubernetes resources, must fully conform to RFC 1123 validation. This includes additional constraints beyond removing underscores and lowercasing, which appear to be the only transformations enforced here. Looking at the full implementation, it seems that the inputs for both prefix and key are not fully validated within Airflow, and we therefore rely on users to provide values that map to valid RFC 1123 subdomain names. If they do not (for example, a `prefix` like "-abc" or a `key` that results in a trailing hyphen), the Kubernetes API will reject the request with a 422 error. I don’t think this represents a critical issue with the implementation itself, but the current wording suggests that Airflow fully sanitizes secret names, which doesn’t seem accurate. It may be worth adjusting the docstrings/comments to clarify that only minimal normalization is performed and that full validation is delegated to Kubernetes. I do wonder why you chose this partial validation approach over full sanitization. Not asking you to do that here but I am curious about the reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
