SameerMesiah97 commented on code in PR #61527:
URL: https://github.com/apache/airflow/pull/61527#discussion_r2779650684


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.client.exceptions import ApiException
+from kubernetes.config import load_incluster_config
+
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets.
+
+    This backend reads secrets using a naming convention, enabling integration 
with
+    External Secrets Operator (ESO) or any tool that creates Kubernetes 
secrets with
+    a predictable naming scheme.
+
+    Configurable via ``airflow.cfg`` like so:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"connections_prefix": "airflow-connections"}
+
+    For example, if the Kubernetes secret name is 
``airflow-connections-my-db`` with a data key
+    ``value`` containing a connection URI, this would be accessible if you 
provide
+    ``{"connections_prefix": "airflow-connections"}`` and request conn_id 
``my_db``.
+
+    The secret name is built as ``{prefix}-{key}`` where underscores in the 
key are
+    replaced with hyphens to conform to Kubernetes DNS naming requirements.
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace is auto-detected from the pod's service account metadata.
+
+    :param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_prefix: Specifies the prefix of the secret to read to get 
Variables.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_prefix: Specifies the prefix of the secret to read to get 
Configurations.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        connections_prefix: str | None = "airflow-connections",
+        variables_prefix: str | None = "airflow-variables",
+        config_prefix: str | None = "airflow-config",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.connections_prefix = connections_prefix
+        self.variables_prefix = variables_prefix
+        self.config_prefix = config_prefix
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Auto-detect namespace from the pod's service account metadata, 
falling back to 'default'."""
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            return "default"

Review Comment:
   Nit: I would suggest adding this below `except FileNotFoundError`:
   
   `self.log.debug("Kubernetes namespace file not found; falling back to 
'default' namespace")`
    
   This could be useful when investigating Kubernetes setup or configuration 
issues.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/secrets/kubernetes_secrets_backend.py:
##########
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing connections, variables, and configs from 
Kubernetes Secrets."""
+
+from __future__ import annotations
+
+import base64
+from functools import cached_property
+from pathlib import Path
+
+from kubernetes.client import ApiClient, CoreV1Api
+from kubernetes.client.exceptions import ApiException
+from kubernetes.config import load_incluster_config
+
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class KubernetesSecretsBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieve connections, variables, and configs from Kubernetes Secrets.
+
+    This backend reads secrets using a naming convention, enabling integration 
with
+    External Secrets Operator (ESO) or any tool that creates Kubernetes 
secrets with
+    a predictable naming scheme.
+
+    Configurable via ``airflow.cfg`` like so:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = 
airflow.providers.cncf.kubernetes.secrets.kubernetes_secrets_backend.KubernetesSecretsBackend
+        backend_kwargs = {"connections_prefix": "airflow-connections"}
+
+    For example, if the Kubernetes secret name is 
``airflow-connections-my-db`` with a data key
+    ``value`` containing a connection URI, this would be accessible if you 
provide
+    ``{"connections_prefix": "airflow-connections"}`` and request conn_id 
``my_db``.
+
+    The secret name is built as ``{prefix}-{key}`` where underscores in the 
key are
+    replaced with hyphens to conform to Kubernetes DNS naming requirements.
+
+    **Authentication:** Uses ``kubernetes.config.load_incluster_config()`` 
directly
+    for in-cluster authentication. Does not use KubernetesHook or any Airflow 
connection,
+    avoiding circular dependencies since this IS the secrets backend.
+    The namespace is auto-detected from the pod's service account metadata.
+
+    :param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+        If set to None, requests for connections will not be sent to 
Kubernetes.
+    :param variables_prefix: Specifies the prefix of the secret to read to get 
Variables.
+        If set to None, requests for variables will not be sent to Kubernetes.
+    :param config_prefix: Specifies the prefix of the secret to read to get 
Configurations.
+        If set to None, requests for configurations will not be sent to 
Kubernetes.
+    :param connections_data_key: The data key in the Kubernetes secret that 
holds the
+        connection value. Default: ``"value"``
+    :param variables_data_key: The data key in the Kubernetes secret that 
holds the
+        variable value. Default: ``"value"``
+    :param config_data_key: The data key in the Kubernetes secret that holds 
the
+        config value. Default: ``"value"``
+    """
+
+    def __init__(
+        self,
+        connections_prefix: str | None = "airflow-connections",
+        variables_prefix: str | None = "airflow-variables",
+        config_prefix: str | None = "airflow-config",
+        connections_data_key: str = "value",
+        variables_data_key: str = "value",
+        config_data_key: str = "value",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.connections_prefix = connections_prefix
+        self.variables_prefix = variables_prefix
+        self.config_prefix = config_prefix
+        self.connections_data_key = connections_data_key
+        self.variables_data_key = variables_data_key
+        self.config_data_key = config_data_key
+
+    @cached_property
+    def namespace(self) -> str:
+        """Auto-detect namespace from the pod's service account metadata, 
falling back to 'default'."""
+        try:
+            return 
Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read_text().strip()
+        except FileNotFoundError:
+            return "default"
+
+    @cached_property
+    def client(self) -> CoreV1Api:
+        """Lazy-init Kubernetes CoreV1Api client using in-cluster config 
directly."""
+        load_incluster_config()
+        return CoreV1Api(ApiClient())
+
+    def get_conn_value(self, conn_id: str, team_name: str | None = None) -> 
str | None:
+        """
+        Get serialized representation of Connection from a Kubernetes secret.
+
+        :param conn_id: connection id
+        :param team_name: Team name associated to the task trying to access 
the connection (if any)
+        """
+        if self.connections_prefix is None:
+            return None
+        return self._get_secret(self.connections_prefix, conn_id, 
self.connections_data_key)
+
+    def get_variable(self, key: str, team_name: str | None = None) -> str | 
None:
+        """
+        Get Airflow Variable from a Kubernetes secret.
+
+        :param key: Variable Key
+        :param team_name: Team name associated to the task trying to access 
the variable (if any)
+        :return: Variable Value
+        """
+        if self.variables_prefix is None:
+            return None
+        return self._get_secret(self.variables_prefix, key, 
self.variables_data_key)
+
+    def get_config(self, key: str) -> str | None:
+        """
+        Get Airflow Configuration from a Kubernetes secret.
+
+        :param key: Configuration Option Key
+        :return: Configuration Option Value
+        """
+        if self.config_prefix is None:
+            return None
+        return self._get_secret(self.config_prefix, key, self.config_data_key)
+
+    def _get_secret(self, prefix: str, key: str, data_key: str) -> str | None:
+        """
+        Get secret value from Kubernetes.
+
+        Builds the secret name as ``{prefix}-{key}``, sanitizes it for K8s DNS
+        compatibility (underscores to hyphens), reads the secret, and returns 
the
+        base64-decoded value for the specified data key.
+
+        :param prefix: Prefix for the secret name
+        :param key: Secret key (e.g. conn_id or variable key)
+        :param data_key: The key within the secret's data dict to read
+        :return: Secret value or None if not found
+        """
+        secret_name = self.build_path(prefix, key, "-")
+        # Sanitize for Kubernetes DNS naming: underscores to hyphens, lowercase
+        secret_name = secret_name.replace("_", "-").lower()

Review Comment:
   Are you sure this is doing a comprehensive sanitization of `secret_name` ? 
My understanding is that Secrets, like other namespaced Kubernetes resources, 
must fully conform to RFC 1123 validation. This includes additional constraints 
beyond removing underscores and lowercasing, which appear to be the only 
transformations enforced here.
   
   Looking at the full implementation, it seems that the inputs for both prefix 
and key are not fully validated within Airflow, and we therefore rely on users 
to provide values that map to valid RFC 1123 subdomain names. If they do not 
(for example, a `prefix` like "-abc" or a `key` that results in a trailing 
hyphen), the Kubernetes API will reject the request with a 422 error.
   
   I don’t think this represents a critical issue with the implementation 
itself, but the current wording suggests that Airflow fully sanitizes secret 
names, which doesn’t seem accurate. It may be worth adjusting the 
docstrings/comments to clarify that only minimal normalization is performed and 
that full validation is delegated to Kubernetes.
   
   I do wonder why you chose this partial validation approach over full 
sanitization. Not asking you to do that here but I am curious about the reason. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to