Piatachock commented on code in PR #36449: URL: https://github.com/apache/airflow/pull/36449#discussion_r1452392318
########## airflow/providers/yandex/secrets/secrets_manager.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +import logging +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.secrets import BaseSecretsBackend + + +class LockboxSecretBackend(BaseSecretsBackend): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": "<folder_ID>"} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = yc_connection_id or default_conn_name + self._use_connection = not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]) + + self.folder_id = folder_id + self.connections_prefix = connections_prefix.rstrip(sep) if connections_prefix is not None else None + self.variables_prefix = variables_prefix.rstrip(sep) if variables_prefix is not None else None + self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not None else None + self.sep = sep + + def get_conn_value(self, conn_id: str) -> str | None: + """ + Retrieve from Secrets Backend a string value representing the Connection object. + + :param conn_id: Connection ID + :return: Connection Value + """ + if self.connections_prefix is None: + return None + + if self._use_connection: + if conn_id == self.yc_connection_id: + return None + + return self._get_secret_value(self.connections_prefix, conn_id) + + def get_variable(self, key: str) -> str | None: + """ + Return value for Airflow Variable. + + :param key: Variable Key + :return: Variable Value + """ + if self.variables_prefix is None: + return None + + return self._get_secret_value(self.variables_prefix, key) + + def get_config(self, key: str) -> str | None: + """ + Return value for Airflow Config Key. + + :param key: Config Key + :return: Config Value + """ + if self.config_prefix is None: + return None + + return self._get_secret_value(self.config_prefix, key) + + @property Review Comment: we should cache SDK, TLS channel creation on each secret request is costly ########## airflow/providers/yandex/secrets/secrets_manager.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +import logging +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.secrets import BaseSecretsBackend + + +class LockboxSecretBackend(BaseSecretsBackend): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": "<folder_ID>"} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = yc_connection_id or default_conn_name + self._use_connection = not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]) + + self.folder_id = folder_id + self.connections_prefix = connections_prefix.rstrip(sep) if connections_prefix is not None else None + self.variables_prefix = variables_prefix.rstrip(sep) if variables_prefix is not None else None + self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not None else None + self.sep = sep + + def get_conn_value(self, conn_id: str) -> str | None: + """ + Retrieve from Secrets Backend a string value representing the Connection object. + + :param conn_id: Connection ID + :return: Connection Value + """ + if self.connections_prefix is None: + return None + + if self._use_connection: + if conn_id == self.yc_connection_id: + return None + + return self._get_secret_value(self.connections_prefix, conn_id) + + def get_variable(self, key: str) -> str | None: + """ + Return value for Airflow Variable. + + :param key: Variable Key + :return: Variable Value + """ + if self.variables_prefix is None: + return None + + return self._get_secret_value(self.variables_prefix, key) + + def get_config(self, key: str) -> str | None: + """ + Return value for Airflow Config Key. + + :param key: Config Key + :return: Config Value + """ + if self.config_prefix is None: + return None + + return self._get_secret_value(self.config_prefix, key) + + @property + def _client(self): + """Create a Yandex Cloud SDK client.""" + if self._use_connection: + self.yc_oauth_token = self._get_field("oauth") + self.yc_sa_key_json = self._get_field("service_account_json") + self.yc_sa_key_json_path = self._get_field("service_account_json_path") + self.folder_id = self.folder_id or self._get_field("folder_id") + + credentials = get_credentials( + oauth_token=self.yc_oauth_token, + service_account_json=self.yc_sa_key_json, + service_account_json_path=self.yc_sa_key_json_path, + ) + return yandexcloud.SDK(**credentials).client Review Comment: Missed `endpoint` and `user_agent` parameters. IMO: * `endpoint` should be part of `SecretsBackend` interface * `user_agent` could be too, or could be just a static string like in Hook. Not so important ########## airflow/providers/yandex/secrets/secrets_manager.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +import logging +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.secrets import BaseSecretsBackend + + +class LockboxSecretBackend(BaseSecretsBackend): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": "<folder_ID>"} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = yc_connection_id or default_conn_name + self._use_connection = not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]) + + self.folder_id = folder_id + self.connections_prefix = connections_prefix.rstrip(sep) if connections_prefix is not None else None + self.variables_prefix = variables_prefix.rstrip(sep) if variables_prefix is not None else None + self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not None else None + self.sep = sep + + def get_conn_value(self, conn_id: str) -> str | None: + """ + Retrieve from Secrets Backend a string value representing the Connection object. + + :param conn_id: Connection ID + :return: Connection Value + """ + if self.connections_prefix is None: + return None + + if self._use_connection: Review Comment: nit: excessive check IMO. `if conn_id == self.yc_connection_id:` alone should work just fine when `self.yc_connection_id is None`. Just dont assign `default_conn_name` to `self.yc_connection_id` if it is not meant to be used. ########## airflow/providers/yandex/secrets/secrets_manager.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +import logging +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.secrets import BaseSecretsBackend + + +class LockboxSecretBackend(BaseSecretsBackend): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": "<folder_ID>"} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = yc_connection_id or default_conn_name Review Comment: nit: ``` if not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]): self.yc_connection_id = yc_connection_id or default_conn_name else: assert yc_connection_id is None, "..." ``` This way you won't need `self._use_connection` ########## airflow/providers/yandex/secrets/secrets_manager.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Objects relating to sourcing secrets from Yandex Cloud Lockbox.""" +from __future__ import annotations + +import logging +from functools import cached_property +from typing import Any + +import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb +import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb +import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as payload_service_pb_grpc +import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb +import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb +import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as secret_service_pb_grpc +import yandexcloud + +from airflow.models import Connection +from airflow.providers.yandex.utils.credentials import get_credentials +from airflow.providers.yandex.utils.defaults import default_conn_name +from airflow.providers.yandex.utils.fields import get_field_from_extras +from airflow.secrets import BaseSecretsBackend + + +class LockboxSecretBackend(BaseSecretsBackend): + """ + Retrieves Connection or Variables or Configs from Yandex Lockbox. + + Configurable via ``airflow.cfg`` like so: + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"connections_prefix": "airflow/connections"} + + For example, when ``{"connections_prefix": "airflow/connections"}`` is set, if a secret is defined with + the path ``airflow/connections/smtp_default``, the connection with conn_id ``smtp_default`` would be + accessible. + + When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is defined with + the path ``airflow/variables/hello``, the variable with the name ``hello`` would be accessible. + + When ``{"config_prefix": "airflow/config"}`` is set, if a secret is defined with + the path ``airflow/config/sql_alchemy_conn``, the config with key ``sql_alchemy_conn`` would be + accessible. + + When the prefix is empty, keys will use the Lockbox Secrets without any prefix. + + .. code-block:: ini + + [secrets] + backend = airflow.providers.yandex.secrets.secrets_manager.SecretsManagerBackend + backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": "<folder_ID>"} + + You need to specify credentials or id of yandexcloud connection to connect to Yandex Lockbox with. + Credentials will be used with this priority: + + * OAuth Token + * Service Account JSON file + * Service Account JSON + * Yandex Cloud Connection + + If no credentials specified, default connection id will be used. + + Also, you need to specify the Yandex Cloud folder ID to search for Yandex Lockbox secrets in. + + :param yc_oauth_token: Specifies the user account OAuth token to connect to Yandex Lockbox with. + Looks like ``y3_xxxxx``. + :param yc_sa_key_json: Specifies the service account auth JSON. + Looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_sa_key_json_path: Specifies the service account auth JSON file path. + Looks like ``/home/airflow/authorized_key.json``. + File content looks like ``{"id": "...", "service_account_id": "...", "private_key": "..."}``. + :param yc_connection_id: Specifies the connection ID to connect to Yandex Lockbox with. + Default: "yandexcloud_default" + :param folder_id: Specifies the folder ID to search for Yandex Lockbox secrets in. + If set to None (null in JSON), requests will use the connection folder_id if specified. + :param connections_prefix: Specifies the prefix of the secret to read to get Connections. + If set to None (null in JSON), requests for connections will not be sent to Yandex Lockbox. + Default: "airflow/connections" + :param variables_prefix: Specifies the prefix of the secret to read to get Variables. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/variables" + :param config_prefix: Specifies the prefix of the secret to read to get Configurations. + If set to None (null in JSON), requests for variables will not be sent to Yandex Lockbox. + Default: "airflow/config" + :param sep: Specifies the separator used to concatenate secret_prefix and secret_id. + Default: "/" + """ + + def __init__( + self, + yc_oauth_token: str | None = None, + yc_sa_key_json: str | None = None, + yc_sa_key_json_path: str | None = None, + yc_connection_id: str | None = None, + folder_id: str = "", + connections_prefix: str | None = "airflow/connections", + variables_prefix: str | None = "airflow/variables", + config_prefix: str | None = "airflow/config", + sep: str = "/", + ): + super().__init__() + + self.yc_oauth_token = yc_oauth_token + self.yc_sa_key_json = yc_sa_key_json + self.yc_sa_key_json_path = yc_sa_key_json_path + self.yc_connection_id = yc_connection_id or default_conn_name + self._use_connection = not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]) + + self.folder_id = folder_id + self.connections_prefix = connections_prefix.rstrip(sep) if connections_prefix is not None else None + self.variables_prefix = variables_prefix.rstrip(sep) if variables_prefix is not None else None + self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not None else None + self.sep = sep + + def get_conn_value(self, conn_id: str) -> str | None: + """ + Retrieve from Secrets Backend a string value representing the Connection object. + + :param conn_id: Connection ID + :return: Connection Value + """ + if self.connections_prefix is None: + return None + + if self._use_connection: + if conn_id == self.yc_connection_id: + return None + + return self._get_secret_value(self.connections_prefix, conn_id) + + def get_variable(self, key: str) -> str | None: + """ + Return value for Airflow Variable. + + :param key: Variable Key + :return: Variable Value + """ + if self.variables_prefix is None: + return None + + return self._get_secret_value(self.variables_prefix, key) + + def get_config(self, key: str) -> str | None: + """ + Return value for Airflow Config Key. + + :param key: Config Key + :return: Config Value + """ + if self.config_prefix is None: + return None + + return self._get_secret_value(self.config_prefix, key) + + @property + def _client(self): + """Create a Yandex Cloud SDK client.""" + if self._use_connection: Review Comment: Reason for this part if initialization to happen here and not in __init__ is confusing. Is Connection object not availabe when `LockboxSecretManager.__init__` is called? If that's the case, I'd add explicit comment to clarify intent of this lazy initialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
