This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12ccb5f0ac feat: add Yandex Cloud Lockbox secrets backend (#36449)
12ccb5f0ac is described below
commit 12ccb5f0ac34007b6cbea4f6a6d9cc6811d71268
Author: Vadim Vladimirov
<[email protected]>
AuthorDate: Thu Jan 25 14:10:00 2024 +0300
feat: add Yandex Cloud Lockbox secrets backend (#36449)
* refactor: move credentials logic to utils
* docs: using metadata service in Yandex.Cloud Connection
* feat: Yandex Cloud Lockbox Secret Backend
* docs: Yandex LockboxSecretBackend
---
airflow/providers/yandex/hooks/yandex.py | 122 ++----
airflow/providers/yandex/secrets/__init__.py | 16 +
airflow/providers/yandex/secrets/lockbox.py | 280 +++++++++++++
airflow/providers/yandex/utils/__init__.py | 16 +
airflow/providers/yandex/utils/credentials.py | 100 +++++
airflow/providers/yandex/utils/defaults.py | 22 ++
airflow/providers/yandex/utils/fields.py | 42 ++
airflow/providers/yandex/utils/user_agent.py | 48 +++
.../connections/yandexcloud.rst | 67 +++-
docs/apache-airflow-providers-yandex/index.rst | 1 +
.../yandex-cloud-lockbox-secret-backend.rst | 293 ++++++++++++++
tests/providers/yandex/hooks/test_yandex.py | 152 +++----
.../yandex/hooks/test_yandexcloud_dataproc.py | 56 ++-
.../yandex/operators/test_yandexcloud_dataproc.py | 38 +-
tests/providers/yandex/secrets/__init__.py | 16 +
tests/providers/yandex/secrets/test_lockbox.py | 435 +++++++++++++++++++++
tests/providers/yandex/utils/__init__.py | 16 +
tests/providers/yandex/utils/test_credentials.py | 168 ++++++++
tests/providers/yandex/utils/test_defaults.py | 16 +
tests/providers/yandex/utils/test_fields.py | 83 ++++
tests/providers/yandex/utils/test_user_agent.py | 52 +++
21 files changed, 1816 insertions(+), 223 deletions(-)
diff --git a/airflow/providers/yandex/hooks/yandex.py
b/airflow/providers/yandex/hooks/yandex.py
index 02bf037ae5..67afc2d59f 100644
--- a/airflow/providers/yandex/hooks/yandex.py
+++ b/airflow/providers/yandex/hooks/yandex.py
@@ -16,27 +16,37 @@
# under the License.
from __future__ import annotations
-import json
import warnings
from typing import Any
import yandexcloud
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.hooks.base import BaseHook
+from airflow.providers.yandex.utils.credentials import (
+ get_credentials,
+ get_service_account_id,
+)
+from airflow.providers.yandex.utils.defaults import conn_name_attr, conn_type,
default_conn_name, hook_name
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
class YandexCloudBaseHook(BaseHook):
"""
A base hook for Yandex.Cloud related tasks.
- :param yandex_conn_id: The connection ID to use when fetching connection
info.
+ :param yandex_conn_id: The connection ID to use when fetching connection
info
+ :param connection_id: Deprecated, use yandex_conn_id instead
+ :param default_folder_id: The folder ID to use instead of connection
folder ID
+ :param default_public_ssh_key: The key to use instead of connection key
+ :param default_service_account_id: The service account ID to use instead
of key service account ID
"""
- conn_name_attr = "yandex_conn_id"
- default_conn_name = "yandexcloud_default"
- conn_type = "yandexcloud"
- hook_name = "Yandex Cloud"
+ conn_name_attr = conn_name_attr
+ default_conn_name = default_conn_name
+ conn_type = conn_type
+ hook_name = hook_name
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
@@ -50,14 +60,14 @@ class YandexCloudBaseHook(BaseHook):
lazy_gettext("Service account auth JSON"),
widget=BS3PasswordFieldWidget(),
description="Service account auth JSON. Looks like "
- '{"id", "...", "service_account_id": "...", "private_key":
"..."}. '
+ '{"id": "...", "service_account_id": "...", "private_key":
"..."}. '
"Will be used instead of OAuth token and SA JSON file path
field if specified.",
),
"service_account_json_path": StringField(
lazy_gettext("Service account auth JSON file path"),
widget=BS3TextFieldWidget(),
description="Service account auth JSON file path. File content
looks like "
- '{"id", "...", "service_account_id": "...", "private_key":
"..."}. '
+ '{"id": "...", "service_account_id": "...", "private_key":
"..."}. '
"Will be used instead of OAuth token if specified.",
),
"oauth": PasswordField(
@@ -75,7 +85,7 @@ class YandexCloudBaseHook(BaseHook):
"public_ssh_key": StringField(
lazy_gettext("Public SSH key"),
widget=BS3TextFieldWidget(),
- description="Optional. This key will be placed to all created
Compute nodes"
+ description="Optional. This key will be placed to all created
Compute nodes "
"to let you have a root shell there",
),
"endpoint": StringField(
@@ -87,30 +97,13 @@ class YandexCloudBaseHook(BaseHook):
@classmethod
def provider_user_agent(cls) -> str | None:
- """Construct User-Agent from Airflow core & provider package
versions."""
- from airflow import __version__ as airflow_version
- from airflow.configuration import conf
- from airflow.providers_manager import ProvidersManager
-
- try:
- manager = ProvidersManager()
- provider_name = manager.hooks[cls.conn_type].package_name # type:
ignore[union-attr]
- provider = manager.providers[provider_name]
- return " ".join(
- (
- conf.get("yandex", "sdk_user_agent_prefix", fallback=""),
- f"apache-airflow/{airflow_version}",
- f"{provider_name}/{provider.version}",
- )
- ).strip()
- except KeyError:
- warnings.warn(
- f"Hook '{cls.hook_name}' info is not initialized in
airflow.ProviderManager",
- UserWarning,
- stacklevel=2,
- )
-
- return None
+ warnings.warn(
+ "Using `provider_user_agent` in `YandexCloudBaseHook` is
deprecated. "
+ "Please use it in `utils.user_agent` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ return provider_user_agent()
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
@@ -122,7 +115,7 @@ class YandexCloudBaseHook(BaseHook):
def __init__(
self,
- # Connection id is deprecated. Use yandex_conn_id instead
+ # connection_id is deprecated, use yandex_conn_id instead
connection_id: str | None = None,
yandex_conn_id: str | None = None,
default_folder_id: str | None = None,
@@ -137,46 +130,23 @@ class YandexCloudBaseHook(BaseHook):
AirflowProviderDeprecationWarning,
stacklevel=2,
)
- self.connection_id = yandex_conn_id or connection_id or
self.default_conn_name
+ self.connection_id = yandex_conn_id or connection_id or
default_conn_name
self.connection = self.get_connection(self.connection_id)
self.extras = self.connection.extra_dejson
- credentials = self._get_credentials()
+ credentials = get_credentials(
+ oauth_token=self._get_field("oauth"),
+ service_account_json=self._get_field("service_account_json"),
+
service_account_json_path=self._get_field("service_account_json_path"),
+ )
sdk_config = self._get_endpoint()
- self.sdk = yandexcloud.SDK(user_agent=self.provider_user_agent(),
**sdk_config, **credentials)
+ self.sdk = yandexcloud.SDK(user_agent=provider_user_agent(),
**sdk_config, **credentials)
self.default_folder_id = default_folder_id or
self._get_field("folder_id")
self.default_public_ssh_key = default_public_ssh_key or
self._get_field("public_ssh_key")
- self.default_service_account_id = default_service_account_id or
self._get_service_account_id()
- self.client = self.sdk.client
-
- def _get_service_account_key(self) -> dict[str, str] | None:
- service_account_json = self._get_field("service_account_json")
- service_account_json_path =
self._get_field("service_account_json_path")
- if service_account_json_path:
- with open(service_account_json_path) as infile:
- service_account_json = infile.read()
- if service_account_json:
- return json.loads(service_account_json)
- return None
-
- def _get_service_account_id(self) -> str | None:
- sa_key = self._get_service_account_key()
- if sa_key:
- return sa_key.get("service_account_id")
- return None
-
- def _get_credentials(self) -> dict[str, Any]:
- oauth_token = self._get_field("oauth")
- if oauth_token:
- return {"token": oauth_token}
-
- service_account_key = self._get_service_account_key()
- if service_account_key:
- return {"service_account_key": service_account_key}
-
- raise AirflowException(
- "No credentials are found in connection. Specify either service
account "
- "authentication JSON or user OAuth token in Yandex.Cloud
connection"
+ self.default_service_account_id = default_service_account_id or
get_service_account_id(
+ service_account_json=self._get_field("service_account_json"),
+
service_account_json_path=self._get_field("service_account_json_path"),
)
+ self.client = self.sdk.client
def _get_endpoint(self) -> dict[str, str]:
sdk_config = {}
@@ -186,18 +156,6 @@ class YandexCloudBaseHook(BaseHook):
return sdk_config
def _get_field(self, field_name: str, default: Any = None) -> Any:
- """Get field from extra, first checking short name, then for
backcompat we check for prefixed name."""
if not hasattr(self, "extras"):
return default
- backcompat_prefix = "extra__yandexcloud__"
- if field_name.startswith("extra__"):
- raise ValueError(
- f"Got prefixed name {field_name}; please remove the
'{backcompat_prefix}' prefix "
- "when using this method."
- )
- if field_name in self.extras:
- return self.extras[field_name]
- prefixed_name = f"{backcompat_prefix}{field_name}"
- if prefixed_name in self.extras:
- return self.extras[prefixed_name]
- return default
+ return get_field_from_extras(self.extras, field_name, default)
diff --git a/airflow/providers/yandex/secrets/__init__.py
b/airflow/providers/yandex/secrets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/yandex/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/yandex/secrets/lockbox.py
b/airflow/providers/yandex/secrets/lockbox.py
new file mode 100644
index 0000000000..adbf994873
--- /dev/null
+++ b/airflow/providers/yandex/secrets/lockbox.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing secrets from Yandex Cloud Lockbox."""
+from __future__ import annotations
+
+from functools import cached_property
+from typing import Any
+
+import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb
+import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb
+import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as
payload_service_pb_grpc
+import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as
secret_service_pb_grpc
+import yandexcloud
+
+from airflow.models import Connection
+from airflow.providers.yandex.utils.credentials import get_credentials
+from airflow.providers.yandex.utils.defaults import default_conn_name
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class LockboxSecretBackend(BaseSecretsBackend, LoggingMixin):
+ """
+ Retrieves Connection or Variables or Configs from Yandex Lockbox.
+
+ Configurable via ``airflow.cfg`` like so:
+
+ .. code-block:: ini
+
+ [secrets]
+ backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+ backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+ For example, when ``{"connections_prefix": "airflow/connections"}`` is
set, if a secret is defined with
+ the path ``airflow/connections/smtp_default``, the connection with conn_id
``smtp_default`` would be
+ accessible.
+
+ When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is
defined with
+ the path ``airflow/variables/hello``, the variable with the name ``hello``
would be accessible.
+
+ When ``{"config_prefix": "airflow/config"}`` is set, if a secret is
defined with
+ the path ``airflow/config/sql_alchemy_conn``, the config with key
``sql_alchemy_conn`` would be
+ accessible.
+
+ When the prefix is empty, keys will use the Lockbox Secrets without any
prefix.
+
+ .. code-block:: ini
+
+ [secrets]
+ backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+ backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id":
"<folder_ID>"}
+
+ You need to specify credentials or id of yandexcloud connection to connect
to Yandex Lockbox with.
+ Credentials will be used with this priority:
+
+ * OAuth Token
+ * Service Account JSON file
+ * Service Account JSON
+ * Yandex Cloud Connection
+
+ If no credentials specified, default connection id will be used.
+
+ Also, you need to specify the Yandex Cloud folder ID to search for Yandex
Lockbox secrets in.
+
+ :param yc_oauth_token: Specifies the user account OAuth token to connect
to Yandex Lockbox with.
+ Looks like ``y3_xxxxx``.
+ :param yc_sa_key_json: Specifies the service account auth JSON.
+ Looks like ``{"id": "...", "service_account_id": "...", "private_key":
"..."}``.
+ :param yc_sa_key_json_path: Specifies the service account auth JSON file
path.
+ Looks like ``/home/airflow/authorized_key.json``.
+ File content looks like ``{"id": "...", "service_account_id": "...",
"private_key": "..."}``.
+ :param yc_connection_id: Specifies the connection ID to connect to Yandex
Lockbox with.
+ Default: "yandexcloud_default"
+ :param folder_id: Specifies the folder ID to search for Yandex Lockbox
secrets in.
+ If set to None (null in JSON), requests will use the connection
folder_id if specified.
+ :param connections_prefix: Specifies the prefix of the secret to read to
get Connections.
+ If set to None (null in JSON), requests for connections will not be
sent to Yandex Lockbox.
+ Default: "airflow/connections"
+ :param variables_prefix: Specifies the prefix of the secret to read to get
Variables.
+ If set to None (null in JSON), requests for variables will not be sent
to Yandex Lockbox.
+ Default: "airflow/variables"
+ :param config_prefix: Specifies the prefix of the secret to read to get
Configurations.
+ If set to None (null in JSON), requests for variables will not be sent
to Yandex Lockbox.
+ Default: "airflow/config"
+ :param sep: Specifies the separator used to concatenate secret_prefix and
secret_id.
+ Default: "/"
+ :param endpoint: Specifies an API endpoint.
+ Leave blank to use default.
+ """
+
+ def __init__(
+ self,
+ yc_oauth_token: str | None = None,
+ yc_sa_key_json: dict | str | None = None,
+ yc_sa_key_json_path: str | None = None,
+ yc_connection_id: str | None = None,
+ folder_id: str = "",
+ connections_prefix: str | None = "airflow/connections",
+ variables_prefix: str | None = "airflow/variables",
+ config_prefix: str | None = "airflow/config",
+ sep: str = "/",
+ endpoint: str | None = None,
+ ):
+ super().__init__()
+
+ self.yc_oauth_token = yc_oauth_token
+ self.yc_sa_key_json = yc_sa_key_json
+ self.yc_sa_key_json_path = yc_sa_key_json_path
+ self.yc_connection_id = None
+ if not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]):
+ self.yc_connection_id = yc_connection_id or default_conn_name
+ else:
+ assert (
+ yc_connection_id is None
+ ), "yc_connection_id should not be used if other credentials are
specified"
+
+ self.folder_id = folder_id
+ self.connections_prefix = connections_prefix.rstrip(sep) if
connections_prefix is not None else None
+ self.variables_prefix = variables_prefix.rstrip(sep) if
variables_prefix is not None else None
+ self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not
None else None
+ self.sep = sep
+ self.endpoint = endpoint
+
+ def get_conn_value(self, conn_id: str) -> str | None:
+ """
+ Retrieve from Secrets Backend a string value representing the
Connection object.
+
+ :param conn_id: Connection ID
+ :return: Connection Value
+ """
+ if self.connections_prefix is None:
+ return None
+
+ if conn_id == self.yc_connection_id:
+ return None
+
+ return self._get_secret_value(self.connections_prefix, conn_id)
+
+ def get_variable(self, key: str) -> str | None:
+ """
+ Return value for Airflow Variable.
+
+ :param key: Variable Key
+ :return: Variable Value
+ """
+ if self.variables_prefix is None:
+ return None
+
+ return self._get_secret_value(self.variables_prefix, key)
+
+ def get_config(self, key: str) -> str | None:
+ """
+ Return value for Airflow Config Key.
+
+ :param key: Config Key
+ :return: Config Value
+ """
+ if self.config_prefix is None:
+ return None
+
+ return self._get_secret_value(self.config_prefix, key)
+
+ @cached_property
+ def _client(self):
+ """
+ Create a Yandex Cloud SDK client.
+
+ Lazy loading is used here
+ because we can't establish a Connection until all secrets backends
have been initialized.
+ """
+ if self.yc_connection_id:
+ self.yc_oauth_token = self._get_field("oauth")
+ self.yc_sa_key_json = self._get_field("service_account_json")
+ self.yc_sa_key_json_path =
self._get_field("service_account_json_path")
+ self.folder_id = self.folder_id or self._get_field("folder_id")
+
+ credentials = get_credentials(
+ oauth_token=self.yc_oauth_token,
+ service_account_json=self.yc_sa_key_json,
+ service_account_json_path=self.yc_sa_key_json_path,
+ )
+ sdk_config = self._get_endpoint()
+ return yandexcloud.SDK(user_agent=provider_user_agent(),
**credentials, **sdk_config).client
+
+ def _get_endpoint(self) -> dict[str, str]:
+ sdk_config = {}
+
+ if self.endpoint:
+ sdk_config["endpoint"] = self.endpoint
+
+ return sdk_config
+
+ @cached_property
+ def _connection(self) -> Connection | None:
+ if not self.yc_connection_id:
+ return None
+
+ conn = Connection.get_connection_from_secrets(self.yc_connection_id)
+ self.log.info("Using connection ID '%s' for task execution.",
conn.conn_id)
+
+ return conn
+
+ def _get_field(self, field_name: str, default: Any = None) -> Any:
+ conn = self._connection
+ if not conn:
+ return None
+
+ return get_field_from_extras(
+ extras=conn.extra_dejson,
+ field_name=field_name,
+ default=default,
+ )
+
+ def _build_secret_name(self, prefix: str, key: str):
+ if len(prefix) == 0:
+ return key
+ return f"{prefix}{self.sep}{key}"
+
+ def _get_secret_value(self, prefix: str, key: str) -> str | None:
+ secret: secret_pb.Secret = None
+ for s in self._get_secrets():
+ if s.name == self._build_secret_name(prefix=prefix, key=key):
+ secret = s
+ break
+ if not secret:
+ return None
+
+ payload = self._get_payload(secret.id, secret.current_version.id)
+ entries = {entry.key: entry.text_value for entry in payload.entries if
entry.text_value}
+
+ if len(entries) == 0:
+ return None
+ return sorted(entries.values())[0]
+
+ def _get_secrets(self) -> list[secret_pb.Secret]:
+ response = self._list_secrets(folder_id=self.folder_id)
+
+ secrets: list[secret_pb.Secret] = response.secrets[:]
+ next_page_token = response.next_page_token
+ while next_page_token != "":
+ response = self._list_secrets(
+ folder_id=self.folder_id,
+ page_token=next_page_token,
+ )
+ secrets.extend(response.secrets)
+ next_page_token = response.next_page_token
+
+ return secrets
+
+ def _get_payload(self, secret_id: str, version_id: str) ->
payload_pb.Payload:
+ request = payload_service_pb.GetPayloadRequest(
+ secret_id=secret_id,
+ version_id=version_id,
+ )
+ return
self._client(payload_service_pb_grpc.PayloadServiceStub).Get(request)
+
+ def _list_secrets(self, folder_id: str, page_token: str = "") ->
secret_service_pb.ListSecretsResponse:
+ request = secret_service_pb.ListSecretsRequest(
+ folder_id=folder_id,
+ page_token=page_token,
+ )
+ return
self._client(secret_service_pb_grpc.SecretServiceStub).List(request)
diff --git a/airflow/providers/yandex/utils/__init__.py
b/airflow/providers/yandex/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/yandex/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/yandex/utils/credentials.py
b/airflow/providers/yandex/utils/credentials.py
new file mode 100644
index 0000000000..f54e8bdfbe
--- /dev/null
+++ b/airflow/providers/yandex/utils/credentials.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+from typing import Any
+
+log = logging.getLogger(__name__)
+
+
+def get_credentials(
+ oauth_token: str | None = None,
+ service_account_json: dict | str | None = None,
+ service_account_json_path: str | None = None,
+) -> dict[str, Any]:
+ """
+ Return credentials JSON for Yandex Cloud SDK based on credentials.
+
+ Credentials will be used with this priority:
+
+ * OAuth Token
+ * Service Account JSON file
+ * Service Account JSON
+ * Metadata Service
+
+ :param oauth_token: OAuth Token
+ :param service_account_json: Service Account JSON key or dict
+ :param service_account_json_path: Service Account JSON key file path
+ :return: Credentials JSON
+ """
+ if oauth_token:
+ return {"token": oauth_token}
+
+ service_account_key = get_service_account_key(
+ service_account_json=service_account_json,
+ service_account_json_path=service_account_json_path,
+ )
+ if service_account_key:
+ return {"service_account_key": service_account_key}
+
+ log.info("using metadata service as credentials")
+ return {}
+
+
+def get_service_account_key(
+ service_account_json: dict | str | None = None,
+ service_account_json_path: str | None = None,
+) -> dict[str, str] | None:
+ """
+ Return Yandex Cloud Service Account key loaded from JSON string or file.
+
+ :param service_account_json: Service Account JSON key or dict
+ :param service_account_json_path: Service Account JSON key file path
+ :return: Yandex Cloud Service Account key
+ """
+ if service_account_json_path:
+ with open(service_account_json_path) as infile:
+ service_account_json = infile.read()
+
+ if isinstance(service_account_json, dict):
+ return service_account_json
+ if service_account_json:
+ return json.loads(service_account_json)
+
+ return None
+
+
+def get_service_account_id(
+ service_account_json: dict | str | None = None,
+ service_account_json_path: str | None = None,
+) -> str | None:
+ """
+ Return Yandex Cloud Service Account ID loaded from JSON string or file.
+
+ :param service_account_json: Service Account JSON key or dict
+ :param service_account_json_path: Service Account JSON key file path
+ :return: Yandex Cloud Service Account ID
+ """
+ sa_key = get_service_account_key(
+ service_account_json=service_account_json,
+ service_account_json_path=service_account_json_path,
+ )
+ if sa_key:
+ return sa_key.get("service_account_id")
+ return None
diff --git a/airflow/providers/yandex/utils/defaults.py
b/airflow/providers/yandex/utils/defaults.py
new file mode 100644
index 0000000000..9fac3ee845
--- /dev/null
+++ b/airflow/providers/yandex/utils/defaults.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+conn_name_attr = "yandex_conn_id"
+default_conn_name = "yandexcloud_default"
+conn_type = "yandexcloud"
+hook_name = "Yandex Cloud"
diff --git a/airflow/providers/yandex/utils/fields.py
b/airflow/providers/yandex/utils/fields.py
new file mode 100644
index 0000000000..27cfc0b3b8
--- /dev/null
+++ b/airflow/providers/yandex/utils/fields.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+
+def get_field_from_extras(extras: dict[str, Any], field_name: str, default:
Any = None) -> Any:
+ """
+ Get field from extras, first checking short name, then for backcompat
checking for prefixed name.
+
+ :param extras: Dictionary with extras keys
+ :param field_name: Field name to get from extras
+ :param default: Default value if field not found
+ :return: Field value or default if not found
+ """
+ backcompat_prefix = "extra__yandexcloud__"
+ if field_name.startswith("extra__"):
+ raise ValueError(
+ f"Got prefixed name {field_name}; please remove the
'{backcompat_prefix}' prefix "
+ "when using this function."
+ )
+ if field_name in extras:
+ return extras[field_name]
+ prefixed_name = f"{backcompat_prefix}{field_name}"
+ if prefixed_name in extras:
+ return extras[prefixed_name]
+ return default
diff --git a/airflow/providers/yandex/utils/user_agent.py
b/airflow/providers/yandex/utils/user_agent.py
new file mode 100644
index 0000000000..08bb8e467f
--- /dev/null
+++ b/airflow/providers/yandex/utils/user_agent.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import warnings
+
+from airflow.providers.yandex.utils.defaults import conn_type, hook_name
+
+
+def provider_user_agent() -> str | None:
+ """Construct User-Agent from Airflow core & provider package versions."""
+ from airflow import __version__ as airflow_version
+ from airflow.configuration import conf
+ from airflow.providers_manager import ProvidersManager
+
+ try:
+ manager = ProvidersManager()
+ provider_name = manager.hooks[conn_type].package_name # type:
ignore[union-attr]
+ provider = manager.providers[provider_name]
+ return " ".join(
+ (
+ conf.get("yandex", "sdk_user_agent_prefix", fallback=""),
+ f"apache-airflow/{airflow_version}",
+ f"{provider_name}/{provider.version}",
+ )
+ ).strip()
+ except KeyError:
+ warnings.warn(
+ f"Hook '{hook_name}' info is not initialized in
airflow.ProviderManager",
+ UserWarning,
+ stacklevel=2,
+ )
+
+ return None
diff --git a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
index 4de36ae043..c67676d828 100644
--- a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
+++ b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
@@ -15,39 +15,31 @@
specific language governing permissions and limitations
under the License.
+.. _yandex_cloud_connection:
Yandex.Cloud Connection
-================================
+=======================
The Yandex.Cloud connection type enables the authentication in Yandex.Cloud
services.
-Authenticating to Yandex.Cloud
----------------------------------
-
-Normally service account keys are used for Yandex.Cloud API authentication.
-https://cloud.yandex.com/docs/cli/operations/authentication/service-account
-
-As an alternative to service account key, user OAuth token can be used for
authentication.
-See the https://cloud.yandex.com/docs/cli/quickstart for obtaining a user
OAuth token.
-
-Default Connection IDs
-----------------------
-
-All hooks and operators related to Yandex.Cloud use ``yandexcloud_default``
connection by default.
-
Configuring the Connection
--------------------------
Service account auth JSON
- JSON object as a string like::
- {"id", "...", "service_account_id": "...", "private_key": "..."}
+ JSON object as a string.
+
+ Example: ``{"id": "...", "service_account_id": "...", "private_key":
"..."}``
Service account auth JSON file path
Path to the file containing service account auth JSON.
+ Example: ``/home/airflow/authorized_key.json``
+
OAuth Token
OAuth token as a string.
+ Example: ``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``
+
SSH public key (optional)
The key will be placed to all created Compute nodes, allowing to have a
root shell there.
@@ -55,8 +47,49 @@ Folder ID (optional)
Folder is a entity to separate different projects within the cloud.
If specified, this ID will be used by default during creation of nodes and
clusters.
+
See
https://cloud.yandex.com/docs/resource-manager/operations/folder/get-id for
details
Endpoint (optional)
Set API endpoint
+
See https://github.com/yandex-cloud/python-sdk for default
+
+Default Connection IDs
+----------------------
+
+All hooks and operators related to Yandex.Cloud use ``yandexcloud_default``
connection by default.
+
+Authenticating to Yandex.Cloud
+------------------------------
+
+Using Authorized keys for authorization as service account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before you start, make sure you have `created
<https://cloud.yandex.com/en/docs/iam/operations/sa/create>`__
+a Yandex Cloud `Service Account
<https://cloud.yandex.com/en/docs/iam/concepts/users/service-accounts>`__
+with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``.
+
+First, you need to create `Authorized key
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/key>`__
+for your service account and save the generated JSON file with public and
private key parts.
+
+Then you need to specify the key in the ``Service account auth JSON`` field.
+
+Alternatively, you can specify the path to JSON file in the ``Service account
auth JSON file path`` field.
+
+Using OAuth token for authorization as users account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create `OAuth token
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/oauth-token>`__
for user account.
+It will looks like
``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``.
+
+Then you need to specify token in the ``OAuth Token`` field.
+
+Using metadata service
+~~~~~~~~~~~~~~~~~~~~~~
+
+If no credentials are specified, the connection will attempt to use
+the `metadata service
<https://cloud.yandex.com/en/docs/compute/concepts/vm-metadata>`__ for
authentication.
+
+To do this, you need to `link
<https://cloud.yandex.ru/en/docs/compute/operations/vm-connect/auth-inside-vm>`__
+your service account with your VM.
diff --git a/docs/apache-airflow-providers-yandex/index.rst
b/docs/apache-airflow-providers-yandex/index.rst
index 7a1736ac51..cfe0f68b30 100644
--- a/docs/apache-airflow-providers-yandex/index.rst
+++ b/docs/apache-airflow-providers-yandex/index.rst
@@ -36,6 +36,7 @@
Configuration <configurations-ref>
Connection types <connections/yandexcloud>
+ Lockbox Secret Backend
<secrets-backends/yandex-cloud-lockbox-secret-backend>
Operators <operators>
.. toctree::
diff --git
a/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
new file mode 100644
index 0000000000..9403dad0ea
--- /dev/null
+++
b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
@@ -0,0 +1,293 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Yandex.Cloud Lockbox Secret Backend
+===================================
+
+This topic describes how to configure Apache Airflow to use `Yandex Lockbox
<https://cloud.yandex.com/en/docs/lockbox>`__
+as a secret backend and how to manage secrets.
+
+Before you begin
+----------------
+
+Before you start, make sure you have installed the ``yandex`` provider in your
Apache Airflow installation:
+
+.. code-block:: bash
+
+ pip install apache-airflow-providers-yandex
+
+Enabling the Yandex Lockbox secret backend
+------------------------------------------
+
+To enable Yandex Lockbox as secrets backend,
+specify
:py:class:`~airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend`
+as the ``backend`` in ``[secrets]`` section of ``airflow.cfg`` file.
+
+Here is a sample configuration:
+
+.. code-block:: ini
+
+ [secrets]
+ backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+You can also set this with an environment variable:
+
+.. code-block:: bash
+
+ export
AIRFLOW__SECRETS__BACKEND=airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+You can verify the correct setting of the configuration options by using the
``airflow config get-value`` command:
+
+.. code-block:: console
+
+ $ airflow config get-value secrets backend
+ airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+Backend parameters
+------------------
+
+The next step is to configure backend parameters using the ``backend_kwargs``
options.
+You can pass the following parameters:
+
+* ``yc_oauth_token``: Specifies the user account OAuth token to connect to
Yandex Lockbox with. Looks like ``y3_xxxxx``.
+* ``yc_sa_key_json``: Specifies the service account auth JSON. Looks like
``{"id": "...", "service_account_id": "...", "private_key": "..."}``.
+* ``yc_sa_key_json_path``: Specifies the service account auth JSON file path.
Looks like ``/home/airflow/authorized_key.json``. File content looks like
``{"id": "...", "service_account_id": "...", "private_key": "..."}``.
+* ``yc_connection_id``: Specifies the connection ID to connect to Yandex
Lockbox with. Default: "yandexcloud_default"
+* ``folder_id``: Specifies the folder ID to search for Yandex Lockbox secrets
in. If set to None (null in JSON), requests will use the connection folder_id
if specified.
+* ``connections_prefix``: Specifies the prefix of the secret to read to get
Connections. If set to None (null in JSON), requests for connections will not
be sent to Yandex Lockbox. Default: "airflow/connections"
+* ``variables_prefix``: Specifies the prefix of the secret to read to get
Variables. If set to None (null in JSON), requests for variables will not be
sent to Yandex Lockbox. Default: "airflow/variables"
+* ``config_prefix``: Specifies the prefix of the secret to read to get
Configurations. If set to None (null in JSON), requests for variables will not
be sent to Yandex Lockbox. Default: "airflow/config"
+* ``sep``: Specifies the separator used to concatenate secret_prefix and
secret_id. Default: "/"
+* ``endpoint``: Specifies an API endpoint. Leave blank to use default.
+
+All options should be passed as a JSON dictionary.
+
+For example, if you want to set parameter ``connections_prefix`` to
``"example-connections-prefix"``
+and parameter ``variables_prefix`` to ``"example-variables-prefix"``,
+your configuration file should look like this:
+
+.. code-block:: ini
+
+ [secrets]
+ backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+ backend_kwargs = {"connections_prefix": "example-connections-prefix",
"variables_prefix": "example-variables-prefix"}
+
+Set-up credentials
+------------------
+
+You need to specify credentials or id of yandexcloud connection to connect to
Yandex Lockbox with.
+Credentials will be used with this priority:
+
+* OAuth Token
+* Service Account JSON file
+* Service Account JSON
+* Yandex Cloud Connection
+
+If no credentials specified, default connection id ``yandexcloud_default``
will be used.
+
+Using OAuth token for authorization as users account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create `OAuth token
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/oauth-token>`__
for user account.
+It will looks like
``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``.
+
+Then you need to specify the ``folder_id`` and token in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+ [secrets]
+ backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_oauth_token":
"y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"}
+
+Using Authorized keys for authorization as service account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before you start, make sure you have `created
<https://cloud.yandex.com/en/docs/iam/operations/sa/create>`__
+a Yandex Cloud `Service Account
<https://cloud.yandex.com/en/docs/iam/concepts/users/service-accounts>`__
+with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``.
+
+First, you need to create `Authorized key
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/key>`__
+for your service account and save the generated JSON file with public and
private key parts.
+
+Then you need to specify the ``folder_id`` and key in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+ [secrets]
+ backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_sa_key_json":
{"id": "...", "service_account_id": "...", "private_key": "..."}"}
+
+Alternatively, you can specify the path to JSON file in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+ [secrets]
+ backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j",
"yc_sa_key_json_path": "/home/airflow/authorized_key.json"}
+
+Using Yandex Cloud Connection for authorization
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create :ref:`Yandex Cloud Connection
<yandex_cloud_connection>`.
+
+Then you need to specify the ``connection_id`` in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+ [secrets]
+ backend_kwargs = {"yc_connection_id": "my_yc_connection"}
+
+If no credentials specified, Lockbox Secret Backend will try to use default
connection id ``yandexcloud_default``.
+
+Lockbox Secret Backend will try to use default folder id from Connection,
+also you can specify the ``folder_id`` in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+ [secrets]
+ backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_connection_id":
"my_yc_connection"}
+
+Storing and Retrieving Connections
+----------------------------------
+
+To store a Connection, you need to `create secret
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{connections_prefix}{sep}{connection_name}``
+and payload contains text value with any key.
+
+Storing a Connection as a URI
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The main way is to save connections as a :ref:`connection URI representation
<generating_connection_uri>`.
+
+Example:
``mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A``
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+ $ yc lockbox secret create \
+ --name airflow/connections/mysqldb \
+ --payload '[{"key": "value", "text_value":
"mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A"}]'
+ done (1s)
+ name: airflow/connections/mysqldb
+
+Storing a Connection as a JSON
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Alternatively, you can save connections in JSON format:
+
+.. code-block:: json
+
+ {
+ "conn_type": "mysql",
+ "host": "myhost.com",
+ "login": "myname",
+ "password": "mypassword",
+ "extra": {
+ "this_param": "some val",
+ "that_param": "other val*"
+ }
+ }
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+ $ yc lockbox secret create \
+ --name airflow/connections/mysqldbjson \
+ --payload '[{"key": "value", "text_value": "{\"conn_type\": \"mysql\",
\"host\": \"myhost.com\", \"login\": \"myname\", \"password\": \"mypassword\",
\"extra\": {\"this_param\": \"some val\", \"that_param\": \"other val*\"}}"}]'
+ done (1s)
+ name: airflow/connections/mysqldbjson
+
+Retrieving Connection
+~~~~~~~~~~~~~~~~~~~~~
+
+To check the connection is correctly read from the Lockbox Secret Backend, you
can use ``airflow connections get``:
+
+.. code-block:: console
+
+ $ airflow connections get mysqldb -o json
+ [{"id": null, "conn_id": "mysqldb", "conn_type": "mysql", "description":
null, "host": "myhost.com", "schema": "", "login": "myname", "password":
"mypassword", "port": null, "is_encrypted": "False", "is_extra_encrypted":
"False", "extra_dejson": {"this_param": "some val", "that_param": "other
val*"}, "get_uri":
"mysql://myname:[email protected]/?this_param=some+val&that_param=other+val%2A"}]
+
+Storing and Retrieving Variables
+--------------------------------
+
+To store a Variable, you need to `create secret
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{variables_prefix}{sep}{variable_name}``
+and payload contains text value with any key.
+
+This is an example variable value: ``some_secret_data``
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+ $ yc lockbox secret create \
+ --name airflow/variables/my_variable \
+ --payload '[{"key": "value", "text_value": "some_secret_data"}]'
+ done (1s)
+ name: airflow/variables/my_variable
+
+To check the variable is correctly read from the Lockbox Secret Backend, you
can use ``airflow variables get``:
+
+.. code-block:: console
+
+ $ airflow variables get my_variable
+ some_secret_data
+
+Storing and Retrieving Configs
+------------------------------
+
+You can store some sensitive configs in the Lockbox Secret Backend.
+
+For example, we will provide a secret for ``sentry.sentry_dsn`` and use
``sentry_dsn_value`` as the config value name.
+
+To store a Config, you need to `create secret
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{config_prefix}{sep}{config_value_name}``
+and payload contains text value with any key.
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+ $ yc lockbox secret create \
+ --name airflow/config/sentry_dsn_value \
+ --payload '[{"key": "value", "text_value":
"https://[email protected]/1"}]'
+ done (1s)
+ name: airflow/config/sentry_dsn_value
+
+Then, we need to specify the config value name as ``{key}_secret`` in the
Apache Airflow configuration:
+
+.. code-block:: ini
+
+ [sentry]
+ sentry_dsn_secret = sentry_dsn_value
+
+To check the config value is correctly read from the Lockbox Secret Backend,
you can use ``airflow config get-value``:
+
+.. code-block:: console
+
+ $ airflow config get-value sentry sentry_dsn
+ https://[email protected]/1
+
+Clean up
+--------
+
+You can easily delete your secret with the ``yc`` cli:
+
+.. code-block:: console
+
+ $ yc lockbox secret delete --name airflow/connections/mysqldb
+ name: airflow/connections/mysqldb
diff --git a/tests/providers/yandex/hooks/test_yandex.py
b/tests/providers/yandex/hooks/test_yandex.py
index 23b460dabf..1ba8c800c1 100644
--- a/tests/providers/yandex/hooks/test_yandex.py
+++ b/tests/providers/yandex/hooks/test_yandex.py
@@ -19,32 +19,30 @@ from __future__ import annotations
import os
from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
import pytest
-from airflow.exceptions import AirflowException
from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
from tests.test_utils.config import conf_vars
class TestYandexHook:
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
-
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
- def test_client_created_without_exceptions(self, get_credentials_mock,
get_connection_mock):
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+ def test_client_created_without_exceptions(self, mock_get_credentials,
mock_get_connection):
"""tests `init` method to validate client creation when all parameters
are passed"""
- # Inputs to constructor
default_folder_id = "test_id"
default_public_ssh_key = "test_key"
extra_dejson = '{"extras": "extra"}'
- get_connection_mock["extra_dejson"] = "sdsd"
- get_connection_mock.extra_dejson = '{"extras": "extra"}'
- get_connection_mock.return_value = mock.Mock(
+ mock_get_connection["extra_dejson"] = "sdsd"
+ mock_get_connection.extra_dejson = '{"extras": "extra"}'
+ mock_get_connection.return_value = mock.Mock(
connection_id="yandexcloud_default", extra_dejson=extra_dejson
)
- get_credentials_mock.return_value = {"token": 122323}
+ mock_get_credentials.return_value = {"token": 122323}
hook = YandexCloudBaseHook(
yandex_conn_id=None,
@@ -54,41 +52,54 @@ class TestYandexHook:
assert hook.client is not None
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
- def test_get_credentials_raise_exception(self, get_connection_mock):
- """tests 'get_credentials' method raising exception if none of the
required fields are passed."""
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+ def test_provider_user_agent(self, mock_get_credentials,
mock_get_connection):
+ mock_get_connection.return_value =
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+ mock_get_credentials.return_value = {"token": 122323}
+ sdk_prefix = "MyAirflow"
- # Inputs to constructor
- default_folder_id = "test_id"
- default_public_ssh_key = "test_key"
+ with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
+ hook = YandexCloudBaseHook()
+ assert hook.provider_user_agent().startswith(sdk_prefix)
- extra_dejson = '{"extras": "extra"}'
- get_connection_mock["extra_dejson"] = "sdsd"
- get_connection_mock.extra_dejson = '{"extras": "extra"}'
- get_connection_mock.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
- )
+ @mock.patch("airflow.hooks.base.BaseHook.get_connection")
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+ def test_sdk_user_agent(self, mock_get_credentials, mock_get_connection):
+ mock_get_connection.return_value =
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+ mock_get_credentials.return_value = {"token": 122323}
+ sdk_prefix = "MyAirflow"
+
+ with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
+ hook = YandexCloudBaseHook()
+ assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix)
- with pytest.raises(AirflowException):
- YandexCloudBaseHook(
- yandex_conn_id=None,
- default_folder_id=default_folder_id,
- default_public_ssh_key=default_public_ssh_key,
- )
+ @pytest.mark.parametrize(
+ "uri",
+ [
+ pytest.param(
+
"a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc",
id="prefix"
+ ),
+ pytest.param("a://?folder_id=abc&public_ssh_key=abc",
id="no-prefix"),
+ ],
+ )
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials",
new=MagicMock())
+ def test_backcompat_prefix_works(self, uri):
+ with mock.patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
+ hook = YandexCloudBaseHook("my_conn")
+ assert hook.default_folder_id == "abc"
+ assert hook.default_public_ssh_key == "abc"
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
-
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
- def test_get_field(self, get_credentials_mock, get_connection_mock):
- # Inputs to constructor
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+ def test_get_endpoint_specified(self, mock_get_credentials,
mock_get_connection):
default_folder_id = "test_id"
default_public_ssh_key = "test_key"
- extra_dejson = {"one": "value_one"}
- get_connection_mock["extra_dejson"] = "sdsd"
- get_connection_mock.extra_dejson = '{"extras": "extra"}'
- get_connection_mock.return_value = mock.Mock(
+ extra_dejson = {"endpoint": "my_endpoint", "something_else":
"some_value"}
+ mock_get_connection.return_value = mock.Mock(
connection_id="yandexcloud_default", extra_dejson=extra_dejson
)
- get_credentials_mock.return_value = {"token": 122323}
+ mock_get_credentials.return_value = {"token": 122323}
hook = YandexCloudBaseHook(
yandex_conn_id=None,
@@ -96,20 +107,19 @@ class TestYandexHook:
default_public_ssh_key=default_public_ssh_key,
)
- assert hook._get_field("one") == "value_one"
+ assert hook._get_endpoint() == {"endpoint": "my_endpoint"}
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
-
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
- def test_get_endpoint_specified(self, get_credentials_mock,
get_connection_mock):
- # Inputs to constructor
+ @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+ def test_get_endpoint_unspecified(self, mock_get_credentials,
mock_get_connection):
default_folder_id = "test_id"
default_public_ssh_key = "test_key"
- extra_dejson = {"endpoint": "my_endpoint", "something_else":
"some_value"}
- get_connection_mock.return_value = mock.Mock(
+ extra_dejson = {"something_else": "some_value"}
+ mock_get_connection.return_value = mock.Mock(
connection_id="yandexcloud_default", extra_dejson=extra_dejson
)
- get_credentials_mock.return_value = {"token": 122323}
+ mock_get_credentials.return_value = {"token": 122323}
hook = YandexCloudBaseHook(
yandex_conn_id=None,
@@ -117,52 +127,50 @@ class TestYandexHook:
default_public_ssh_key=default_public_ssh_key,
)
- assert hook._get_endpoint() == {"endpoint": "my_endpoint"}
+ assert hook._get_endpoint() == {}
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
-
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
- def test_get_endpoint_unspecified(self, get_credentials_mock,
get_connection_mock):
- # Inputs to constructor
+ def test__get_field(self, mock_get_connection):
+ field_name = "one"
+ field_value = "value_one"
default_folder_id = "test_id"
default_public_ssh_key = "test_key"
+ extra_dejson = {field_name: field_value}
- extra_dejson = {"something_else": "some_value"}
- get_connection_mock.return_value = mock.Mock(
+ mock_get_connection["extra_dejson"] = "sdsd"
+ mock_get_connection.extra_dejson = '{"extras": "extra"}'
+ mock_get_connection.return_value = mock.Mock(
connection_id="yandexcloud_default", extra_dejson=extra_dejson
)
- get_credentials_mock.return_value = {"token": 122323}
hook = YandexCloudBaseHook(
yandex_conn_id=None,
default_folder_id=default_folder_id,
default_public_ssh_key=default_public_ssh_key,
)
+ res = hook._get_field(
+ field_name=field_name,
+ )
- assert hook._get_endpoint() == {}
+ assert res == field_value
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
-
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
- def test_sdk_user_agent(self, get_credentials_mock, get_connection_mock):
- get_connection_mock.return_value =
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
- get_credentials_mock.return_value = {"token": 122323}
- sdk_prefix = "MyAirflow"
+ def test__get_field_extras_not_found(self, get_connection_mock):
+ field_name = "some_field"
+ default = "some_default"
+ extra_dejson = '{"extras": "extra"}'
- with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
- hook = YandexCloudBaseHook()
- assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix)
+ get_connection_mock["extra_dejson"] = "sdsd"
+ get_connection_mock.extra_dejson = '{"extras": "extra"}'
+ get_connection_mock.return_value = mock.Mock(
+ connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ )
- @pytest.mark.parametrize(
- "uri",
- [
- pytest.param(
-
"a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc",
id="prefix"
- ),
- pytest.param("a://?folder_id=abc&public_ssh_key=abc",
id="no-prefix"),
- ],
- )
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials",
new=MagicMock())
- def test_backcompat_prefix_works(self, uri):
- with patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
- hook = YandexCloudBaseHook("my_conn")
- assert hook.default_folder_id == "abc"
- assert hook.default_public_ssh_key == "abc"
+ hook = YandexCloudBaseHook()
+ delattr(hook, "extras")
+ res = hook._get_field(
+ field_name=field_name,
+ default=default,
+ )
+
+ assert res == default
diff --git a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
index cf436a9b8c..71b94dcd57 100644
--- a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
@@ -17,19 +17,10 @@
from __future__ import annotations
import json
-from unittest.mock import patch
-
-import pytest
+from unittest import mock
from airflow.models import Connection
-
-try:
- import yandexcloud
-
- from airflow.providers.yandex.hooks.yandexcloud_dataproc import
DataprocHook
-except ImportError:
- yandexcloud = None
-
+from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook
# Airflow connection with type "yandexcloud" must be created
CONNECTION_ID = "yandexcloud_default"
@@ -61,23 +52,22 @@ SSH_PUBLIC_KEYS = [
"cFDe6faKCxH6iDRteo4D8L8BxwzN42uZSB0nfmjkIxFTcEU3mFSXEbWByg78aoddMrAAjatyrhH1pON6P0="
]
-# If Yandex.Cloud credentials are set than full test will be run. Otherwise
only mocked tests.
+# If Yandex.Cloud credentials are set than full test will be run. Otherwise,
only mocked tests.
HAS_CREDENTIALS = OAUTH_TOKEN != "my_oauth_token"
[email protected](yandexcloud is None, reason="Skipping Yandex.Cloud hook
test: no yandexcloud module")
class TestYandexCloudDataprocHook:
def _init_hook(self):
- with patch("airflow.hooks.base.BaseHook.get_connection") as
get_connection_mock:
- get_connection_mock.return_value = self.connection
+ with mock.patch("airflow.hooks.base.BaseHook.get_connection") as
mock_get_connection:
+ mock_get_connection.return_value = self.connection
self.hook = DataprocHook()
def setup_method(self):
self.connection = Connection(extra=json.dumps({"oauth": OAUTH_TOKEN}))
self._init_hook()
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_create_dataproc_cluster_mocked(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_create_dataproc_cluster_mocked(self, mock_create_operation):
self._init_hook()
self.hook.client.create_cluster(
@@ -90,16 +80,16 @@ class TestYandexCloudDataprocHook:
cluster_image_version=CLUSTER_IMAGE_VERSION,
service_account_id=SERVICE_ACCOUNT_ID,
)
- assert create_operation_mock.called
+ assert mock_create_operation.called
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_delete_dataproc_cluster_mocked(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_delete_dataproc_cluster_mocked(self, mock_create_operation):
self._init_hook()
self.hook.client.delete_cluster("my_cluster_id")
- assert create_operation_mock.called
+ assert mock_create_operation.called
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_create_hive_job_hook(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_create_hive_job_hook(self, mock_create_operation):
self._init_hook()
self.hook.client.create_hive_job(
@@ -110,10 +100,10 @@ class TestYandexCloudDataprocHook:
query="SELECT 1;",
script_variables=None,
)
- assert create_operation_mock.called
+ assert mock_create_operation.called
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_create_mapreduce_job_hook(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_create_mapreduce_job_hook(self, mock_create_operation):
self._init_hook()
self.hook.client.create_mapreduce_job(
@@ -145,10 +135,10 @@ class TestYandexCloudDataprocHook:
"mapreduce.job.maps": "6",
},
)
- assert create_operation_mock.called
+ assert mock_create_operation.called
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_create_spark_job_hook(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_create_spark_job_hook(self, mock_create_operation):
self._init_hook()
self.hook.client.create_spark_job(
@@ -170,10 +160,10 @@ class TestYandexCloudDataprocHook:
name="Spark job",
properties={"spark.submit.deployMode": "cluster"},
)
- assert create_operation_mock.called
+ assert mock_create_operation.called
- @patch("yandexcloud.SDK.create_operation_and_get_result")
- def test_create_pyspark_job_hook(self, create_operation_mock):
+ @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+ def test_create_pyspark_job_hook(self, mock_create_operation):
self._init_hook()
self.hook.client.create_pyspark_job(
@@ -194,4 +184,4 @@ class TestYandexCloudDataprocHook:
properties={"spark.submit.deployMode": "cluster"},
python_file_uris=["s3a://some-in-bucket/jobs/sources/pyspark-001/geonames.py"],
)
- assert create_operation_mock.called
+ assert mock_create_operation.called
diff --git a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
index 879645daf6..083e3d538a 100644
--- a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
@@ -76,10 +76,10 @@ class TestDataprocClusterCreateOperator:
schedule="@daily",
)
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.create_cluster")
- def test_create_cluster(self, create_cluster_mock, *_):
+ def test_create_cluster(self, mock_create_cluster, *_):
operator = DataprocCreateClusterOperator(
task_id="create_cluster",
ssh_public_keys=SSH_PUBLIC_KEYS,
@@ -93,7 +93,7 @@ class TestDataprocClusterCreateOperator:
)
context = {"task_instance": MagicMock()}
operator.execute(context)
- create_cluster_mock.assert_called_once_with(
+ mock_create_cluster.assert_called_once_with(
cluster_description="",
cluster_image_version="1.4",
cluster_name=None,
@@ -135,15 +135,15 @@ class TestDataprocClusterCreateOperator:
)
context["task_instance"].xcom_push.assert_has_calls(
[
- call(key="cluster_id",
value=create_cluster_mock().response.id),
+ call(key="cluster_id",
value=mock_create_cluster().response.id),
call(key="yandexcloud_connection_id", value=CONNECTION_ID),
]
)
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.delete_cluster")
- def test_delete_cluster_operator(self, delete_cluster_mock, *_):
+ def test_delete_cluster_operator(self, mock_delete_cluster, *_):
operator = DataprocDeleteClusterOperator(
task_id="delete_cluster",
connection_id=CONNECTION_ID,
@@ -152,12 +152,12 @@ class TestDataprocClusterCreateOperator:
context["task_instance"].xcom_pull.return_value = "my_cluster_id"
operator.execute(context)
context["task_instance"].xcom_pull.assert_called_once_with(key="cluster_id")
- delete_cluster_mock.assert_called_once_with("my_cluster_id")
+ mock_delete_cluster.assert_called_once_with("my_cluster_id")
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.create_hive_job")
- def test_create_hive_job_operator(self, create_hive_job_mock, *_):
+ def test_create_hive_job_operator(self, mock_create_hive_job, *_):
operator = DataprocCreateHiveJobOperator(
task_id="create_hive_job",
query="SELECT 1;",
@@ -173,7 +173,7 @@ class TestDataprocClusterCreateOperator:
]
)
- create_hive_job_mock.assert_called_once_with(
+ mock_create_hive_job.assert_called_once_with(
cluster_id="my_cluster_id",
continue_on_failure=False,
name="Hive job",
@@ -183,10 +183,10 @@ class TestDataprocClusterCreateOperator:
script_variables=None,
)
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.create_mapreduce_job")
- def test_create_mapreduce_job_operator(self, create_mapreduce_job_mock,
*_):
+ def test_create_mapreduce_job_operator(self, mock_create_mapreduce_job,
*_):
operator = DataprocCreateMapReduceJobOperator(
task_id="run_mapreduce_job",
main_class="org.apache.hadoop.streaming.HadoopStreaming",
@@ -223,7 +223,7 @@ class TestDataprocClusterCreateOperator:
]
)
- create_mapreduce_job_mock.assert_called_once_with(
+ mock_create_mapreduce_job.assert_called_once_with(
archive_uris=None,
args=[
"-mapper",
@@ -253,10 +253,10 @@ class TestDataprocClusterCreateOperator:
},
)
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.create_spark_job")
- def test_create_spark_job_operator(self, create_spark_job_mock, *_):
+ def test_create_spark_job_operator(self, mock_create_spark_job, *_):
operator = DataprocCreateSparkJobOperator(
task_id="create_spark_job",
main_jar_file_uri="s3a://data-proc-public/jobs/sources/java/dataproc-examples-1.0.jar",
@@ -292,7 +292,7 @@ class TestDataprocClusterCreateOperator:
]
)
- create_spark_job_mock.assert_called_once_with(
+ mock_create_spark_job.assert_called_once_with(
archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"],
args=[
"s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2",
@@ -315,10 +315,10 @@ class TestDataprocClusterCreateOperator:
exclude_packages=None,
)
-
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+ @patch("airflow.providers.yandex.utils.credentials.get_credentials")
@patch("airflow.hooks.base.BaseHook.get_connection")
@patch("yandexcloud._wrappers.dataproc.Dataproc.create_pyspark_job")
- def test_create_pyspark_job_operator(self, create_pyspark_job_mock, *_):
+ def test_create_pyspark_job_operator(self, mock_create_pyspark_job, *_):
operator = DataprocCreatePysparkJobOperator(
task_id="create_pyspark_job",
main_python_file_uri="s3a://some-in-bucket/jobs/sources/pyspark-001/main.py",
@@ -355,7 +355,7 @@ class TestDataprocClusterCreateOperator:
]
)
- create_pyspark_job_mock.assert_called_once_with(
+ mock_create_pyspark_job.assert_called_once_with(
archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"],
args=[
"s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2",
diff --git a/tests/providers/yandex/secrets/__init__.py
b/tests/providers/yandex/secrets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/secrets/test_lockbox.py
b/tests/providers/yandex/secrets/test_lockbox.py
new file mode 100644
index 0000000000..e51724f866
--- /dev/null
+++ b/tests/providers/yandex/secrets/test_lockbox.py
@@ -0,0 +1,435 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest.mock import MagicMock, Mock, patch
+
+import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb
+import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb
+
+from airflow.providers.yandex.secrets.lockbox import LockboxSecretBackend
+from airflow.providers.yandex.utils.defaults import default_conn_name
+
+
+class TestLockboxSecretBackend:
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_connection(self,
mock_get_value):
+ conn_id = "fake_conn"
+ conn_type = "scheme"
+ host = "host"
+ login = "user"
+ password = "pass"
+ port = 100
+ uri = f"{conn_type}://{login}:{password}@{host}:{port}"
+
+ mock_get_value.return_value = uri
+
+ conn = LockboxSecretBackend().get_connection(conn_id)
+
+ assert conn.conn_id == conn_id
+ assert conn.conn_type == conn_type
+ assert conn.host == host
+ assert conn.schema == ""
+ assert conn.login == login
+ assert conn.password == password
+ assert conn.port == port
+ assert conn.get_uri() == uri
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_connection_from_json(self,
mock_get_value):
+ conn_id = "airflow_to_yandexcloud"
+ conn_type = "yandex_cloud"
+ extra = "some extra values"
+ c = {
+ "conn_type": conn_type,
+ "extra": extra,
+ }
+
+ mock_get_value.return_value = json.dumps(c)
+
+ conn = LockboxSecretBackend().get_connection(conn_id)
+
+ assert conn.conn_id == conn_id
+ assert conn.conn_type == conn_type
+ assert conn.extra == extra
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_variable(self, mock_get_value):
+ k = "thisiskey"
+ v = "thisisvalue"
+
+ mock_get_value.return_value = v
+
+ value = LockboxSecretBackend().get_variable(k)
+
+ assert value == v
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_config(self, mock_get_value):
+ k = "thisiskey"
+ v = "thisisvalue"
+
+ mock_get_value.return_value = v
+
+ value = LockboxSecretBackend().get_config(k)
+
+ assert value == v
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_connection_prefix_is_none(self,
mock_get_value):
+ uri = "scheme://user:pass@host:100"
+
+ mock_get_value.return_value = uri
+
+ conn = LockboxSecretBackend(
+ connections_prefix=None,
+ ).get_connection("fake_conn")
+
+ assert conn is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def
test_yandex_lockbox_secret_backend_get_connection_with_oauth_token_auth(self,
mock_get_value):
+ conn_id = "yandex_cloud"
+ uri = "scheme://user:pass@host:100"
+
+ mock_get_value.return_value = uri
+
+ conn = LockboxSecretBackend(
+
yc_oauth_token="y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc",
+ ).get_connection(conn_id)
+
+ assert conn.conn_id == conn_id
+ assert conn.get_uri() == uri
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def
test_yandex_lockbox_secret_backend_get_connection_conn_id_for_backend(self,
mock_get_value):
+ conn_id = "yandex_cloud"
+ uri = "scheme://user:pass@host:100"
+
+ mock_get_value.return_value = uri
+
+ conn = LockboxSecretBackend(
+ yc_connection_id=conn_id,
+ ).get_connection(conn_id)
+
+ assert conn is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def
test_yandex_lockbox_secret_backend_get_connection_default_conn_id(self,
mock_get_value):
+ conn_id = default_conn_name
+ uri = "scheme://user:pass@host:100"
+
+ mock_get_value.return_value = uri
+
+ conn = LockboxSecretBackend().get_connection(conn_id)
+
+ assert conn is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_variable_prefix_is_none(self,
mock_get_value):
+ k = "thisiskey"
+ v = "thisisvalue"
+
+ mock_get_value.return_value = v
+
+ value = LockboxSecretBackend(
+ variables_prefix=None,
+ ).get_variable(k)
+
+ assert value is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+ def test_yandex_lockbox_secret_backend_get_config_prefix_is_none(self,
mock_get_value):
+ k = "thisiskey"
+ v = "thisisvalue"
+
+ mock_get_value.return_value = v
+
+ value = LockboxSecretBackend(
+ config_prefix=None,
+ ).get_config(k)
+
+ assert value is None
+
+ def
test_yandex_lockbox_secret_backend__client_created_without_exceptions(self):
+ yc_oauth_token =
"y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"
+
+ sm = LockboxSecretBackend(
+ yc_oauth_token=yc_oauth_token,
+ )
+
+ assert sm._client is not None
+
+ def test_yandex_lockbox_secret_backedn__get_endpoint(self):
+ endpoint = "api.cloud.yandex.net"
+ expected = {
+ "endpoint": endpoint,
+ }
+
+ res = LockboxSecretBackend(
+ endpoint=endpoint,
+ )._get_endpoint()
+
+ assert res == expected
+
+ def test_yandex_lockbox_secret_backedn__get_endpoint_not_specified(self):
+ expected = {}
+
+ res = LockboxSecretBackend()._get_endpoint()
+
+ assert res == expected
+
+ def test_yandex_lockbox_secret_backend__build_secret_name(self):
+ prefix = "thiisprefix"
+ key = "thisiskey"
+ expected = "thiisprefix/thisiskey"
+
+ res = LockboxSecretBackend()._build_secret_name(prefix, key)
+
+ assert res == expected
+
+ def test_yandex_lockbox_secret_backend__build_secret_name_no_prefix(self):
+ prefix = ""
+ key = "thisiskey"
+ expected = "thisiskey"
+
+ res = LockboxSecretBackend()._build_secret_name(prefix, key)
+
+ assert res == expected
+
+ def test_yandex_lockbox_secret_backend__build_secret_name_custom_sep(self):
+ sep = "_"
+ prefix = "thiisprefix"
+ key = "thisiskey"
+ expected = "thiisprefix_thisiskey"
+
+ res = LockboxSecretBackend(
+ sep=sep,
+ )._build_secret_name(prefix, key)
+
+ assert res == expected
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+ def test_yandex_lockbox_secret_backend__get_secret_value(self,
mock_get_payload, mock_get_secrets):
+ target_name = "target_name"
+ target_text = "target_text"
+
+ mock_get_secrets.return_value = [
+ secret_pb.Secret(
+ id="123",
+ name="one",
+ ),
+ secret_pb.Secret(
+ id="456",
+ name=target_name,
+ ),
+ secret_pb.Secret(
+ id="789",
+ name="two",
+ ),
+ ]
+ mock_get_payload.return_value = payload_pb.Payload(
+ entries=[
+ payload_pb.Payload.Entry(text_value=target_text),
+ ],
+ )
+
+ res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+ assert res == target_text
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+ def test_yandex_lockbox_secret_backend__get_secret_value_not_found(
+ self, mock_get_payload, mock_get_secrets
+ ):
+ target_name = "target_name"
+ target_text = "target_text"
+
+ mock_get_secrets.return_value = [
+ secret_pb.Secret(
+ id="123",
+ name="one",
+ ),
+ secret_pb.Secret(
+ id="789",
+ name="two",
+ ),
+ ]
+ mock_get_payload.return_value = payload_pb.Payload(
+ entries=[
+ payload_pb.Payload.Entry(text_value=target_text),
+ ],
+ )
+
+ res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+ assert res is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+ def test_yandex_lockbox_secret_backend__get_secret_value_no_text_entries(
+ self, mock_get_payload, mock_get_secrets
+ ):
+ target_name = "target_name"
+ target_value = b"01010101"
+
+ mock_get_secrets.return_value = [
+ secret_pb.Secret(
+ id="123",
+ name="one",
+ ),
+ secret_pb.Secret(
+ id="456",
+ name="two",
+ ),
+ secret_pb.Secret(
+ id="789",
+ name=target_name,
+ ),
+ ]
+ mock_get_payload.return_value = payload_pb.Payload(
+ entries=[
+ payload_pb.Payload.Entry(binary_value=target_value),
+ ],
+ )
+
+ res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+ assert res is None
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets")
+ def test_yandex_lockbox_secret_backend__get_secrets(self,
mock_list_secrets):
+ secrets = secret_service_pb.ListSecretsResponse(
+ secrets=[
+ secret_pb.Secret(
+ id="123",
+ ),
+ secret_pb.Secret(
+ id="456",
+ ),
+ ],
+ )
+
+ mock_list_secrets.return_value = secrets
+
+ res = LockboxSecretBackend(
+ folder_id="someid",
+ )._get_secrets()
+
+ assert res == secrets.secrets
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets")
+ def test_yandex_lockbox_secret_backend__get_secrets_page_token(self,
mock_list_secrets):
+ first_secrets = secret_service_pb.ListSecretsResponse(
+ secrets=[
+ secret_pb.Secret(
+ id="123",
+ ),
+ secret_pb.Secret(
+ id="456",
+ ),
+ ],
+ next_page_token="token",
+ )
+ second_secrets = secret_service_pb.ListSecretsResponse(
+ secrets=[
+ secret_pb.Secret(
+ id="789",
+ ),
+ secret_pb.Secret(
+ id="000",
+ ),
+ ],
+ next_page_token="",
+ )
+
+ mock_list_secrets.side_effect = [
+ first_secrets,
+ second_secrets,
+ ]
+
+ res = LockboxSecretBackend(
+ folder_id="someid",
+ )._get_secrets()
+
+ assert res == [*first_secrets.secrets, *second_secrets.secrets]
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client")
+ def test_yandex_lockbox_secret_backend__get_payload(self, mock_client):
+ mock_stub = MagicMock()
+ mock_response = payload_pb.Payload()
+ mock_stub.Get.return_value = mock_response
+ mock_client.return_value = mock_stub
+
+ result = LockboxSecretBackend()._get_payload(
+ secret_id="test_secret",
+ version_id="test_version",
+ )
+
+ mock_client.assert_called_once()
+ mock_stub.Get.assert_called_once()
+ assert result == mock_response
+
+
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client")
+ def test_yandex_lockbox_secret_backend__list_secrets(self, mock_client):
+ mock_stub = MagicMock()
+ mock_response = secret_service_pb.ListSecretsResponse()
+ mock_stub.List.return_value = mock_response
+ mock_client.return_value = mock_stub
+
+ result = LockboxSecretBackend()._list_secrets(
+ folder_id="test_folder",
+ )
+
+ mock_client.assert_called_once()
+ mock_stub.List.assert_called_once()
+ assert result == mock_response
+
+ def test_yandex_lockbox_secret_backend_folder_id(self):
+ folder_id = "id1"
+
+ res = LockboxSecretBackend(
+ folder_id=folder_id,
+ ).folder_id
+
+ assert res == folder_id
+
+ @patch("airflow.models.connection.Connection.get_connection_from_secrets")
+ def test_yandex_lockbox_secret_backend_folder_id_from_connection(self,
mock_get_connection):
+ folder_id = "id1"
+
+ mock_get_connection.return_value = Mock(
+ connection_id=default_conn_name,
+ extra_dejson={"folder_id": folder_id},
+ )
+
+ sm = LockboxSecretBackend()
+ _ = sm._client
+ res = sm.folder_id
+
+ assert res == folder_id
+
+ def
test_yandex_lockbox_secret_backend__get_field_connection_not_specified(self):
+ sm = LockboxSecretBackend()
+ sm.yc_connection_id = None
+ res = sm._get_field("somefield")
+
+ assert res is None
diff --git a/tests/providers/yandex/utils/__init__.py
b/tests/providers/yandex/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/utils/test_credentials.py
b/tests/providers/yandex/utils/test_credentials.py
new file mode 100644
index 0000000000..5bc1e17490
--- /dev/null
+++ b/tests/providers/yandex/utils/test_credentials.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest import mock
+
+from airflow.providers.yandex.utils.credentials import (
+ get_credentials,
+ get_service_account_id,
+ get_service_account_key,
+)
+
+
+def test_get_credentials_oauth_token():
+ oauth_token = "y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"
+ service_account_key = {
+ "id": "...",
+ "service_account_id": "...",
+ "private_key": "...",
+ }
+ service_account_key_json = json.dumps(service_account_key)
+ service_account_file_path = "/home/airflow/authorized_key.json"
+ expected = {"token": oauth_token}
+
+ res = get_credentials(
+ oauth_token=oauth_token,
+ service_account_json=service_account_key_json,
+ service_account_json_path=service_account_file_path,
+ )
+
+ assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_credentials_service_account_key(mock_get_service_account_key):
+ service_account_key = {
+ "id": "...",
+ "service_account_id": "...",
+ "private_key": "...",
+ }
+ service_account_key_json = json.dumps(service_account_key)
+ service_account_file_path = "/home/airflow/authorized_key.json"
+ expected = {"service_account_key": service_account_key}
+
+ mock_get_service_account_key.return_value = service_account_key
+
+ res = get_credentials(
+ service_account_json=service_account_key_json,
+ service_account_json_path=service_account_file_path,
+ )
+
+ assert res == expected
+
+
+def test_get_credentials_metadata_service(caplog):
+ expected = {}
+
+ res = get_credentials()
+
+ assert res == expected
+ assert "using metadata service as credentials" in caplog.text
+
+
+def test_get_service_account_key():
+ service_account_key = {
+ "id": "...",
+ "service_account_id": "...",
+ "private_key": "...",
+ }
+ service_account_key_json = json.dumps(service_account_key)
+ expected = service_account_key
+
+ res = get_service_account_key(
+ service_account_json=service_account_key_json,
+ )
+
+ assert res == expected
+
+
+def test_get_service_account_dict():
+ service_account_key = {
+ "id": "...",
+ "service_account_id": "...",
+ "private_key": "...",
+ }
+ expected = service_account_key
+
+ res = get_service_account_key(
+ service_account_json=service_account_key,
+ )
+
+ assert res == expected
+
+
+def test_get_service_account_key_file(tmp_path):
+ service_account_key = {
+ "id": "...",
+ "service_account_id": "...",
+ "private_key": "...",
+ }
+ service_account_key_json = json.dumps(service_account_key)
+ service_account_file = tmp_path / "authorized_key.json"
+ service_account_file.write_text(service_account_key_json)
+ service_account_file_path = str(service_account_file)
+ expected = service_account_key
+
+ res = get_service_account_key(
+ service_account_json=service_account_key_json,
+ service_account_json_path=service_account_file_path,
+ )
+
+ assert res == expected
+
+
+def test_get_service_account_key_none():
+ expected = None
+
+ res = get_service_account_key()
+
+ assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_service_account_id(mock_get_service_account_key):
+ service_account_id = "this_is_service_account_id"
+ service_account_key = {
+ "id": "...",
+ "service_account_id": service_account_id,
+ "private_key": "...",
+ }
+ service_account_key_json = json.dumps(service_account_key)
+ service_account_file_path = "/home/airflow/authorized_key.json"
+ expected = service_account_id
+
+ mock_get_service_account_key.return_value = service_account_key
+
+ res = get_service_account_id(
+ service_account_json=service_account_key_json,
+ service_account_json_path=service_account_file_path,
+ )
+
+ assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_service_account_id_none(mock_get_service_account_key):
+ expected = None
+
+ mock_get_service_account_key.return_value = None
+
+ res = get_service_account_id()
+
+ assert res == expected
diff --git a/tests/providers/yandex/utils/test_defaults.py
b/tests/providers/yandex/utils/test_defaults.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/utils/test_defaults.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/utils/test_fields.py
b/tests/providers/yandex/utils/test_fields.py
new file mode 100644
index 0000000000..3b42282324
--- /dev/null
+++ b/tests/providers/yandex/utils/test_fields.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+
+
+def test_get_field_from_extras():
+ field_name = "somefield"
+ default = None
+ expected = "somevalue"
+ extras = {
+ field_name: expected,
+ }
+
+ res = get_field_from_extras(
+ extras=extras,
+ field_name=field_name,
+ default=default,
+ )
+
+ assert res == expected
+
+
+def test_get_field_from_extras_not_found():
+ field_name = "somefield"
+ default = "default"
+ expected = default
+ extras = {}
+
+ res = get_field_from_extras(
+ extras=extras,
+ field_name=field_name,
+ default=default,
+ )
+
+ assert res == expected
+
+
+def test_get_field_from_extras_prefixed_in_extra():
+ field_name = "somefield"
+ default = None
+ expected = "somevalue"
+ extras = {
+ f"extra__yandexcloud__{field_name}": expected,
+ }
+
+ res = get_field_from_extras(
+ extras=extras,
+ field_name=field_name,
+ default=default,
+ )
+
+ assert res == expected
+
+
+def test_get_field_from_extras_field_name_with_extra_raise_exception():
+ field_name = "extra__yandexcloud__fieldname"
+ default = None
+ extras = {}
+
+ with pytest.raises(ValueError):
+ get_field_from_extras(
+ extras=extras,
+ field_name=field_name,
+ default=default,
+ )
diff --git a/tests/providers/yandex/utils/test_user_agent.py
b/tests/providers/yandex/utils/test_user_agent.py
new file mode 100644
index 0000000000..854549e391
--- /dev/null
+++ b/tests/providers/yandex/utils/test_user_agent.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
+
+
+def test_provider_user_agent():
+ user_agent = provider_user_agent()
+
+ from airflow import __version__ as airflow_version
+
+ user_agent_airflow = f"apache-airflow/{airflow_version}"
+ assert user_agent_airflow in user_agent
+
+ from airflow.providers_manager import ProvidersManager
+
+ manager = ProvidersManager()
+ provider_name = manager.hooks["yandexcloud"].package_name
+ provider = manager.providers[provider_name]
+ user_agent_provider = f"{provider_name}/{provider.version}"
+ assert user_agent_provider in user_agent
+
+ from airflow.configuration import conf
+
+ user_agent_prefix = conf.get("yandex", "sdk_user_agent_prefix",
fallback="")
+ assert user_agent_prefix in user_agent
+
+
[email protected]("airflow.providers_manager.ProvidersManager.hooks")
+def test_provider_user_agent_hook_not_exists(mock_hooks):
+ mock_hooks.return_value = []
+
+ user_agent = provider_user_agent()
+
+ assert user_agent is None