This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 12ccb5f0ac feat: add Yandex Cloud Lockbox secrets backend (#36449)
12ccb5f0ac is described below

commit 12ccb5f0ac34007b6cbea4f6a6d9cc6811d71268
Author: Vadim Vladimirov 
<[email protected]>
AuthorDate: Thu Jan 25 14:10:00 2024 +0300

    feat: add Yandex Cloud Lockbox secrets backend (#36449)
    
    * refactor: move credentials logic to utils
    
    * docs: using metadata service in Yandex.Cloud Connection
    
    * feat: Yandex Cloud Lockbox Secret Backend
    
    * docs: Yandex LockboxSecretBackend
---
 airflow/providers/yandex/hooks/yandex.py           | 122 ++----
 airflow/providers/yandex/secrets/__init__.py       |  16 +
 airflow/providers/yandex/secrets/lockbox.py        | 280 +++++++++++++
 airflow/providers/yandex/utils/__init__.py         |  16 +
 airflow/providers/yandex/utils/credentials.py      | 100 +++++
 airflow/providers/yandex/utils/defaults.py         |  22 ++
 airflow/providers/yandex/utils/fields.py           |  42 ++
 airflow/providers/yandex/utils/user_agent.py       |  48 +++
 .../connections/yandexcloud.rst                    |  67 +++-
 docs/apache-airflow-providers-yandex/index.rst     |   1 +
 .../yandex-cloud-lockbox-secret-backend.rst        | 293 ++++++++++++++
 tests/providers/yandex/hooks/test_yandex.py        | 152 +++----
 .../yandex/hooks/test_yandexcloud_dataproc.py      |  56 ++-
 .../yandex/operators/test_yandexcloud_dataproc.py  |  38 +-
 tests/providers/yandex/secrets/__init__.py         |  16 +
 tests/providers/yandex/secrets/test_lockbox.py     | 435 +++++++++++++++++++++
 tests/providers/yandex/utils/__init__.py           |  16 +
 tests/providers/yandex/utils/test_credentials.py   | 168 ++++++++
 tests/providers/yandex/utils/test_defaults.py      |  16 +
 tests/providers/yandex/utils/test_fields.py        |  83 ++++
 tests/providers/yandex/utils/test_user_agent.py    |  52 +++
 21 files changed, 1816 insertions(+), 223 deletions(-)

diff --git a/airflow/providers/yandex/hooks/yandex.py 
b/airflow/providers/yandex/hooks/yandex.py
index 02bf037ae5..67afc2d59f 100644
--- a/airflow/providers/yandex/hooks/yandex.py
+++ b/airflow/providers/yandex/hooks/yandex.py
@@ -16,27 +16,37 @@
 # under the License.
 from __future__ import annotations
 
-import json
 import warnings
 from typing import Any
 
 import yandexcloud
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.hooks.base import BaseHook
+from airflow.providers.yandex.utils.credentials import (
+    get_credentials,
+    get_service_account_id,
+)
+from airflow.providers.yandex.utils.defaults import conn_name_attr, conn_type, 
default_conn_name, hook_name
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
 
 
 class YandexCloudBaseHook(BaseHook):
     """
     A base hook for Yandex.Cloud related tasks.
 
-    :param yandex_conn_id: The connection ID to use when fetching connection 
info.
+    :param yandex_conn_id: The connection ID to use when fetching connection 
info
+    :param connection_id: Deprecated, use yandex_conn_id instead
+    :param default_folder_id: The folder ID to use instead of connection 
folder ID
+    :param default_public_ssh_key: The key to use instead of connection key
+    :param default_service_account_id: The service account ID to use instead 
of key service account ID
     """
 
-    conn_name_attr = "yandex_conn_id"
-    default_conn_name = "yandexcloud_default"
-    conn_type = "yandexcloud"
-    hook_name = "Yandex Cloud"
+    conn_name_attr = conn_name_attr
+    default_conn_name = default_conn_name
+    conn_type = conn_type
+    hook_name = hook_name
 
     @classmethod
     def get_connection_form_widgets(cls) -> dict[str, Any]:
@@ -50,14 +60,14 @@ class YandexCloudBaseHook(BaseHook):
                 lazy_gettext("Service account auth JSON"),
                 widget=BS3PasswordFieldWidget(),
                 description="Service account auth JSON. Looks like "
-                '{"id", "...", "service_account_id": "...", "private_key": 
"..."}. '
+                '{"id": "...", "service_account_id": "...", "private_key": 
"..."}. '
                 "Will be used instead of OAuth token and SA JSON file path 
field if specified.",
             ),
             "service_account_json_path": StringField(
                 lazy_gettext("Service account auth JSON file path"),
                 widget=BS3TextFieldWidget(),
                 description="Service account auth JSON file path. File content 
looks like "
-                '{"id", "...", "service_account_id": "...", "private_key": 
"..."}. '
+                '{"id": "...", "service_account_id": "...", "private_key": 
"..."}. '
                 "Will be used instead of OAuth token if specified.",
             ),
             "oauth": PasswordField(
@@ -75,7 +85,7 @@ class YandexCloudBaseHook(BaseHook):
             "public_ssh_key": StringField(
                 lazy_gettext("Public SSH key"),
                 widget=BS3TextFieldWidget(),
-                description="Optional. This key will be placed to all created 
Compute nodes"
+                description="Optional. This key will be placed to all created 
Compute nodes "
                 "to let you have a root shell there",
             ),
             "endpoint": StringField(
@@ -87,30 +97,13 @@ class YandexCloudBaseHook(BaseHook):
 
     @classmethod
     def provider_user_agent(cls) -> str | None:
-        """Construct User-Agent from Airflow core & provider package 
versions."""
-        from airflow import __version__ as airflow_version
-        from airflow.configuration import conf
-        from airflow.providers_manager import ProvidersManager
-
-        try:
-            manager = ProvidersManager()
-            provider_name = manager.hooks[cls.conn_type].package_name  # type: 
ignore[union-attr]
-            provider = manager.providers[provider_name]
-            return " ".join(
-                (
-                    conf.get("yandex", "sdk_user_agent_prefix", fallback=""),
-                    f"apache-airflow/{airflow_version}",
-                    f"{provider_name}/{provider.version}",
-                )
-            ).strip()
-        except KeyError:
-            warnings.warn(
-                f"Hook '{cls.hook_name}' info is not initialized in 
airflow.ProviderManager",
-                UserWarning,
-                stacklevel=2,
-            )
-
-            return None
+        warnings.warn(
+            "Using `provider_user_agent` in `YandexCloudBaseHook` is 
deprecated. "
+            "Please use it in `utils.user_agent` instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=2,
+        )
+        return provider_user_agent()
 
     @classmethod
     def get_ui_field_behaviour(cls) -> dict[str, Any]:
@@ -122,7 +115,7 @@ class YandexCloudBaseHook(BaseHook):
 
     def __init__(
         self,
-        # Connection id is deprecated. Use yandex_conn_id instead
+        # connection_id is deprecated, use yandex_conn_id instead
         connection_id: str | None = None,
         yandex_conn_id: str | None = None,
         default_folder_id: str | None = None,
@@ -137,46 +130,23 @@ class YandexCloudBaseHook(BaseHook):
                 AirflowProviderDeprecationWarning,
                 stacklevel=2,
             )
-        self.connection_id = yandex_conn_id or connection_id or 
self.default_conn_name
+        self.connection_id = yandex_conn_id or connection_id or 
default_conn_name
         self.connection = self.get_connection(self.connection_id)
         self.extras = self.connection.extra_dejson
-        credentials = self._get_credentials()
+        credentials = get_credentials(
+            oauth_token=self._get_field("oauth"),
+            service_account_json=self._get_field("service_account_json"),
+            
service_account_json_path=self._get_field("service_account_json_path"),
+        )
         sdk_config = self._get_endpoint()
-        self.sdk = yandexcloud.SDK(user_agent=self.provider_user_agent(), 
**sdk_config, **credentials)
+        self.sdk = yandexcloud.SDK(user_agent=provider_user_agent(), 
**sdk_config, **credentials)
         self.default_folder_id = default_folder_id or 
self._get_field("folder_id")
         self.default_public_ssh_key = default_public_ssh_key or 
self._get_field("public_ssh_key")
-        self.default_service_account_id = default_service_account_id or 
self._get_service_account_id()
-        self.client = self.sdk.client
-
-    def _get_service_account_key(self) -> dict[str, str] | None:
-        service_account_json = self._get_field("service_account_json")
-        service_account_json_path = 
self._get_field("service_account_json_path")
-        if service_account_json_path:
-            with open(service_account_json_path) as infile:
-                service_account_json = infile.read()
-        if service_account_json:
-            return json.loads(service_account_json)
-        return None
-
-    def _get_service_account_id(self) -> str | None:
-        sa_key = self._get_service_account_key()
-        if sa_key:
-            return sa_key.get("service_account_id")
-        return None
-
-    def _get_credentials(self) -> dict[str, Any]:
-        oauth_token = self._get_field("oauth")
-        if oauth_token:
-            return {"token": oauth_token}
-
-        service_account_key = self._get_service_account_key()
-        if service_account_key:
-            return {"service_account_key": service_account_key}
-
-        raise AirflowException(
-            "No credentials are found in connection. Specify either service 
account "
-            "authentication JSON or user OAuth token in Yandex.Cloud 
connection"
+        self.default_service_account_id = default_service_account_id or 
get_service_account_id(
+            service_account_json=self._get_field("service_account_json"),
+            
service_account_json_path=self._get_field("service_account_json_path"),
         )
+        self.client = self.sdk.client
 
     def _get_endpoint(self) -> dict[str, str]:
         sdk_config = {}
@@ -186,18 +156,6 @@ class YandexCloudBaseHook(BaseHook):
         return sdk_config
 
     def _get_field(self, field_name: str, default: Any = None) -> Any:
-        """Get field from extra, first checking short name, then for 
backcompat we check for prefixed name."""
         if not hasattr(self, "extras"):
             return default
-        backcompat_prefix = "extra__yandexcloud__"
-        if field_name.startswith("extra__"):
-            raise ValueError(
-                f"Got prefixed name {field_name}; please remove the 
'{backcompat_prefix}' prefix "
-                "when using this method."
-            )
-        if field_name in self.extras:
-            return self.extras[field_name]
-        prefixed_name = f"{backcompat_prefix}{field_name}"
-        if prefixed_name in self.extras:
-            return self.extras[prefixed_name]
-        return default
+        return get_field_from_extras(self.extras, field_name, default)
diff --git a/airflow/providers/yandex/secrets/__init__.py 
b/airflow/providers/yandex/secrets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/yandex/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/yandex/secrets/lockbox.py 
b/airflow/providers/yandex/secrets/lockbox.py
new file mode 100644
index 0000000000..adbf994873
--- /dev/null
+++ b/airflow/providers/yandex/secrets/lockbox.py
@@ -0,0 +1,280 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Objects relating to sourcing secrets from Yandex Cloud Lockbox."""
+from __future__ import annotations
+
+from functools import cached_property
+from typing import Any
+
+import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb
+import yandex.cloud.lockbox.v1.payload_service_pb2 as payload_service_pb
+import yandex.cloud.lockbox.v1.payload_service_pb2_grpc as 
payload_service_pb_grpc
+import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2_grpc as 
secret_service_pb_grpc
+import yandexcloud
+
+from airflow.models import Connection
+from airflow.providers.yandex.utils.credentials import get_credentials
+from airflow.providers.yandex.utils.defaults import default_conn_name
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class LockboxSecretBackend(BaseSecretsBackend, LoggingMixin):
+    """
+    Retrieves Connection or Variables or Configs from Yandex Lockbox.
+
+    Configurable via ``airflow.cfg`` like so:
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+        backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+    For example, when ``{"connections_prefix": "airflow/connections"}`` is 
set, if a secret is defined with
+    the path ``airflow/connections/smtp_default``, the connection with conn_id 
``smtp_default`` would be
+    accessible.
+
+    When ``{"variables_prefix": "airflow/variables"}`` is set, if a secret is 
defined with
+    the path ``airflow/variables/hello``, the variable with the name ``hello`` 
would be accessible.
+
+    When ``{"config_prefix": "airflow/config"}`` is set, if a secret is 
defined with
+    the path ``airflow/config/sql_alchemy_conn``, the config with key 
``sql_alchemy_conn`` would be
+    accessible.
+
+    When the prefix is empty, keys will use the Lockbox Secrets without any 
prefix.
+
+    .. code-block:: ini
+
+        [secrets]
+        backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+        backend_kwargs = {"yc_connection_id": "<connection_ID>", "folder_id": 
"<folder_ID>"}
+
+    You need to specify credentials or id of yandexcloud connection to connect 
to Yandex Lockbox with.
+    Credentials will be used with this priority:
+
+    * OAuth Token
+    * Service Account JSON file
+    * Service Account JSON
+    * Yandex Cloud Connection
+
+    If no credentials specified, default connection id will be used.
+
+    Also, you need to specify the Yandex Cloud folder ID to search for Yandex 
Lockbox secrets in.
+
+    :param yc_oauth_token: Specifies the user account OAuth token to connect 
to Yandex Lockbox with.
+        Looks like ``y3_xxxxx``.
+    :param yc_sa_key_json: Specifies the service account auth JSON.
+        Looks like ``{"id": "...", "service_account_id": "...", "private_key": 
"..."}``.
+    :param yc_sa_key_json_path: Specifies the service account auth JSON file 
path.
+        Looks like ``/home/airflow/authorized_key.json``.
+        File content looks like ``{"id": "...", "service_account_id": "...", 
"private_key": "..."}``.
+    :param yc_connection_id: Specifies the connection ID to connect to Yandex 
Lockbox with.
+        Default: "yandexcloud_default"
+    :param folder_id: Specifies the folder ID to search for Yandex Lockbox 
secrets in.
+        If set to None (null in JSON), requests will use the connection 
folder_id if specified.
+    :param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+        If set to None (null in JSON), requests for connections will not be 
sent to Yandex Lockbox.
+        Default: "airflow/connections"
+    :param variables_prefix: Specifies the prefix of the secret to read to get 
Variables.
+        If set to None (null in JSON), requests for variables will not be sent 
to Yandex Lockbox.
+        Default: "airflow/variables"
+    :param config_prefix: Specifies the prefix of the secret to read to get 
Configurations.
+        If set to None (null in JSON), requests for variables will not be sent 
to Yandex Lockbox.
+        Default: "airflow/config"
+    :param sep: Specifies the separator used to concatenate secret_prefix and 
secret_id.
+        Default: "/"
+    :param endpoint: Specifies an API endpoint.
+        Leave blank to use default.
+    """
+
+    def __init__(
+        self,
+        yc_oauth_token: str | None = None,
+        yc_sa_key_json: dict | str | None = None,
+        yc_sa_key_json_path: str | None = None,
+        yc_connection_id: str | None = None,
+        folder_id: str = "",
+        connections_prefix: str | None = "airflow/connections",
+        variables_prefix: str | None = "airflow/variables",
+        config_prefix: str | None = "airflow/config",
+        sep: str = "/",
+        endpoint: str | None = None,
+    ):
+        super().__init__()
+
+        self.yc_oauth_token = yc_oauth_token
+        self.yc_sa_key_json = yc_sa_key_json
+        self.yc_sa_key_json_path = yc_sa_key_json_path
+        self.yc_connection_id = None
+        if not any([yc_oauth_token, yc_sa_key_json, yc_sa_key_json_path]):
+            self.yc_connection_id = yc_connection_id or default_conn_name
+        else:
+            assert (
+                yc_connection_id is None
+            ), "yc_connection_id should not be used if other credentials are 
specified"
+
+        self.folder_id = folder_id
+        self.connections_prefix = connections_prefix.rstrip(sep) if 
connections_prefix is not None else None
+        self.variables_prefix = variables_prefix.rstrip(sep) if 
variables_prefix is not None else None
+        self.config_prefix = config_prefix.rstrip(sep) if config_prefix is not 
None else None
+        self.sep = sep
+        self.endpoint = endpoint
+
+    def get_conn_value(self, conn_id: str) -> str | None:
+        """
+        Retrieve from Secrets Backend a string value representing the 
Connection object.
+
+        :param conn_id: Connection ID
+        :return: Connection Value
+        """
+        if self.connections_prefix is None:
+            return None
+
+        if conn_id == self.yc_connection_id:
+            return None
+
+        return self._get_secret_value(self.connections_prefix, conn_id)
+
+    def get_variable(self, key: str) -> str | None:
+        """
+        Return value for Airflow Variable.
+
+        :param key: Variable Key
+        :return: Variable Value
+        """
+        if self.variables_prefix is None:
+            return None
+
+        return self._get_secret_value(self.variables_prefix, key)
+
+    def get_config(self, key: str) -> str | None:
+        """
+        Return value for Airflow Config Key.
+
+        :param key: Config Key
+        :return: Config Value
+        """
+        if self.config_prefix is None:
+            return None
+
+        return self._get_secret_value(self.config_prefix, key)
+
+    @cached_property
+    def _client(self):
+        """
+        Create a Yandex Cloud SDK client.
+
+        Lazy loading is used here
+        because we can't establish a Connection until all secrets backends 
have been initialized.
+        """
+        if self.yc_connection_id:
+            self.yc_oauth_token = self._get_field("oauth")
+            self.yc_sa_key_json = self._get_field("service_account_json")
+            self.yc_sa_key_json_path = 
self._get_field("service_account_json_path")
+            self.folder_id = self.folder_id or self._get_field("folder_id")
+
+        credentials = get_credentials(
+            oauth_token=self.yc_oauth_token,
+            service_account_json=self.yc_sa_key_json,
+            service_account_json_path=self.yc_sa_key_json_path,
+        )
+        sdk_config = self._get_endpoint()
+        return yandexcloud.SDK(user_agent=provider_user_agent(), 
**credentials, **sdk_config).client
+
+    def _get_endpoint(self) -> dict[str, str]:
+        sdk_config = {}
+
+        if self.endpoint:
+            sdk_config["endpoint"] = self.endpoint
+
+        return sdk_config
+
+    @cached_property
+    def _connection(self) -> Connection | None:
+        if not self.yc_connection_id:
+            return None
+
+        conn = Connection.get_connection_from_secrets(self.yc_connection_id)
+        self.log.info("Using connection ID '%s' for task execution.", 
conn.conn_id)
+
+        return conn
+
+    def _get_field(self, field_name: str, default: Any = None) -> Any:
+        conn = self._connection
+        if not conn:
+            return None
+
+        return get_field_from_extras(
+            extras=conn.extra_dejson,
+            field_name=field_name,
+            default=default,
+        )
+
+    def _build_secret_name(self, prefix: str, key: str):
+        if len(prefix) == 0:
+            return key
+        return f"{prefix}{self.sep}{key}"
+
+    def _get_secret_value(self, prefix: str, key: str) -> str | None:
+        secret: secret_pb.Secret = None
+        for s in self._get_secrets():
+            if s.name == self._build_secret_name(prefix=prefix, key=key):
+                secret = s
+                break
+        if not secret:
+            return None
+
+        payload = self._get_payload(secret.id, secret.current_version.id)
+        entries = {entry.key: entry.text_value for entry in payload.entries if 
entry.text_value}
+
+        if len(entries) == 0:
+            return None
+        return sorted(entries.values())[0]
+
+    def _get_secrets(self) -> list[secret_pb.Secret]:
+        response = self._list_secrets(folder_id=self.folder_id)
+
+        secrets: list[secret_pb.Secret] = response.secrets[:]
+        next_page_token = response.next_page_token
+        while next_page_token != "":
+            response = self._list_secrets(
+                folder_id=self.folder_id,
+                page_token=next_page_token,
+            )
+            secrets.extend(response.secrets)
+            next_page_token = response.next_page_token
+
+        return secrets
+
+    def _get_payload(self, secret_id: str, version_id: str) -> 
payload_pb.Payload:
+        request = payload_service_pb.GetPayloadRequest(
+            secret_id=secret_id,
+            version_id=version_id,
+        )
+        return 
self._client(payload_service_pb_grpc.PayloadServiceStub).Get(request)
+
+    def _list_secrets(self, folder_id: str, page_token: str = "") -> 
secret_service_pb.ListSecretsResponse:
+        request = secret_service_pb.ListSecretsRequest(
+            folder_id=folder_id,
+            page_token=page_token,
+        )
+        return 
self._client(secret_service_pb_grpc.SecretServiceStub).List(request)
diff --git a/airflow/providers/yandex/utils/__init__.py 
b/airflow/providers/yandex/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/yandex/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/yandex/utils/credentials.py 
b/airflow/providers/yandex/utils/credentials.py
new file mode 100644
index 0000000000..f54e8bdfbe
--- /dev/null
+++ b/airflow/providers/yandex/utils/credentials.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import logging
+from typing import Any
+
+log = logging.getLogger(__name__)
+
+
+def get_credentials(
+    oauth_token: str | None = None,
+    service_account_json: dict | str | None = None,
+    service_account_json_path: str | None = None,
+) -> dict[str, Any]:
+    """
+    Return credentials JSON for Yandex Cloud SDK based on credentials.
+
+    Credentials will be used with this priority:
+
+    * OAuth Token
+    * Service Account JSON file
+    * Service Account JSON
+    * Metadata Service
+
+    :param oauth_token: OAuth Token
+    :param service_account_json: Service Account JSON key or dict
+    :param service_account_json_path: Service Account JSON key file path
+    :return: Credentials JSON
+    """
+    if oauth_token:
+        return {"token": oauth_token}
+
+    service_account_key = get_service_account_key(
+        service_account_json=service_account_json,
+        service_account_json_path=service_account_json_path,
+    )
+    if service_account_key:
+        return {"service_account_key": service_account_key}
+
+    log.info("using metadata service as credentials")
+    return {}
+
+
+def get_service_account_key(
+    service_account_json: dict | str | None = None,
+    service_account_json_path: str | None = None,
+) -> dict[str, str] | None:
+    """
+    Return Yandex Cloud Service Account key loaded from JSON string or file.
+
+    :param service_account_json: Service Account JSON key or dict
+    :param service_account_json_path: Service Account JSON key file path
+    :return: Yandex Cloud Service Account key
+    """
+    if service_account_json_path:
+        with open(service_account_json_path) as infile:
+            service_account_json = infile.read()
+
+    if isinstance(service_account_json, dict):
+        return service_account_json
+    if service_account_json:
+        return json.loads(service_account_json)
+
+    return None
+
+
+def get_service_account_id(
+    service_account_json: dict | str | None = None,
+    service_account_json_path: str | None = None,
+) -> str | None:
+    """
+    Return Yandex Cloud Service Account ID loaded from JSON string or file.
+
+    :param service_account_json: Service Account JSON key or dict
+    :param service_account_json_path: Service Account JSON key file path
+    :return: Yandex Cloud Service Account ID
+    """
+    sa_key = get_service_account_key(
+        service_account_json=service_account_json,
+        service_account_json_path=service_account_json_path,
+    )
+    if sa_key:
+        return sa_key.get("service_account_id")
+    return None
diff --git a/airflow/providers/yandex/utils/defaults.py 
b/airflow/providers/yandex/utils/defaults.py
new file mode 100644
index 0000000000..9fac3ee845
--- /dev/null
+++ b/airflow/providers/yandex/utils/defaults.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+conn_name_attr = "yandex_conn_id"
+default_conn_name = "yandexcloud_default"
+conn_type = "yandexcloud"
+hook_name = "Yandex Cloud"
diff --git a/airflow/providers/yandex/utils/fields.py 
b/airflow/providers/yandex/utils/fields.py
new file mode 100644
index 0000000000..27cfc0b3b8
--- /dev/null
+++ b/airflow/providers/yandex/utils/fields.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any
+
+
+def get_field_from_extras(extras: dict[str, Any], field_name: str, default: 
Any = None) -> Any:
+    """
+    Get field from extras, first checking short name, then for backcompat 
checking for prefixed name.
+
+    :param extras: Dictionary with extras keys
+    :param field_name: Field name to get from extras
+    :param default: Default value if field not found
+    :return: Field value or default if not found
+    """
+    backcompat_prefix = "extra__yandexcloud__"
+    if field_name.startswith("extra__"):
+        raise ValueError(
+            f"Got prefixed name {field_name}; please remove the 
'{backcompat_prefix}' prefix "
+            "when using this function."
+        )
+    if field_name in extras:
+        return extras[field_name]
+    prefixed_name = f"{backcompat_prefix}{field_name}"
+    if prefixed_name in extras:
+        return extras[prefixed_name]
+    return default
diff --git a/airflow/providers/yandex/utils/user_agent.py 
b/airflow/providers/yandex/utils/user_agent.py
new file mode 100644
index 0000000000..08bb8e467f
--- /dev/null
+++ b/airflow/providers/yandex/utils/user_agent.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import warnings
+
+from airflow.providers.yandex.utils.defaults import conn_type, hook_name
+
+
+def provider_user_agent() -> str | None:
+    """Construct User-Agent from Airflow core & provider package versions."""
+    from airflow import __version__ as airflow_version
+    from airflow.configuration import conf
+    from airflow.providers_manager import ProvidersManager
+
+    try:
+        manager = ProvidersManager()
+        provider_name = manager.hooks[conn_type].package_name  # type: 
ignore[union-attr]
+        provider = manager.providers[provider_name]
+        return " ".join(
+            (
+                conf.get("yandex", "sdk_user_agent_prefix", fallback=""),
+                f"apache-airflow/{airflow_version}",
+                f"{provider_name}/{provider.version}",
+            )
+        ).strip()
+    except KeyError:
+        warnings.warn(
+            f"Hook '{hook_name}' info is not initialized in 
airflow.ProviderManager",
+            UserWarning,
+            stacklevel=2,
+        )
+
+    return None
diff --git a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst 
b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
index 4de36ae043..c67676d828 100644
--- a/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
+++ b/docs/apache-airflow-providers-yandex/connections/yandexcloud.rst
@@ -15,39 +15,31 @@
     specific language governing permissions and limitations
     under the License.
 
+.. _yandex_cloud_connection:
 
 Yandex.Cloud Connection
-================================
+=======================
 
 The Yandex.Cloud connection type enables the authentication in Yandex.Cloud 
services.
 
-Authenticating to Yandex.Cloud
----------------------------------
-
-Normally service account keys are used for Yandex.Cloud API authentication.
-https://cloud.yandex.com/docs/cli/operations/authentication/service-account
-
-As an alternative to service account key, user OAuth token can be used for 
authentication.
-See the https://cloud.yandex.com/docs/cli/quickstart for obtaining a user 
OAuth token.
-
-Default Connection IDs
-----------------------
-
-All hooks and operators related to Yandex.Cloud use ``yandexcloud_default`` 
connection by default.
-
 Configuring the Connection
 --------------------------
 
 Service account auth JSON
-    JSON object as a string like::
-        {"id", "...", "service_account_id": "...", "private_key": "..."}
+    JSON object as a string.
+
+    Example: ``{"id": "...", "service_account_id": "...", "private_key": 
"..."}``
 
 Service account auth JSON file path
     Path to the file containing service account auth JSON.
 
+    Example: ``/home/airflow/authorized_key.json``
+
 OAuth Token
     OAuth token as a string.
 
+    Example: ``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``
+
 SSH public key (optional)
     The key will be placed to all created Compute nodes, allowing to have a 
root shell there.
 
@@ -55,8 +47,49 @@ Folder ID (optional)
     Folder is a entity to separate different projects within the cloud.
 
     If specified, this ID will be used by default during creation of nodes and 
clusters.
+
     See 
https://cloud.yandex.com/docs/resource-manager/operations/folder/get-id for 
details
 
 Endpoint (optional)
     Set API endpoint
+
     See https://github.com/yandex-cloud/python-sdk for default
+
+Default Connection IDs
+----------------------
+
+All hooks and operators related to Yandex.Cloud use ``yandexcloud_default`` 
connection by default.
+
+Authenticating to Yandex.Cloud
+------------------------------
+
+Using Authorized keys for authorization as service account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before you start, make sure you have `created 
<https://cloud.yandex.com/en/docs/iam/operations/sa/create>`__
+a Yandex Cloud `Service Account 
<https://cloud.yandex.com/en/docs/iam/concepts/users/service-accounts>`__
+with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``.
+
+First, you need to create `Authorized key 
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/key>`__
+for your service account and save the generated JSON file with public and 
private key parts.
+
+Then you need to specify the key in the ``Service account auth JSON`` field.
+
+Alternatively, you can specify the path to JSON file in the ``Service account 
auth JSON file path`` field.
+
+Using OAuth token for authorization as users account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create `OAuth token 
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/oauth-token>`__ 
for user account.
+It will looks like 
``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``.
+
+Then you need to specify token in the ``OAuth Token`` field.
+
+Using metadata service
+~~~~~~~~~~~~~~~~~~~~~~
+
+If no credentials are specified, the connection will attempt to use
+the `metadata service 
<https://cloud.yandex.com/en/docs/compute/concepts/vm-metadata>`__ for 
authentication.
+
+To do this, you need to `link 
<https://cloud.yandex.ru/en/docs/compute/operations/vm-connect/auth-inside-vm>`__
+your service account with your VM.
diff --git a/docs/apache-airflow-providers-yandex/index.rst 
b/docs/apache-airflow-providers-yandex/index.rst
index 7a1736ac51..cfe0f68b30 100644
--- a/docs/apache-airflow-providers-yandex/index.rst
+++ b/docs/apache-airflow-providers-yandex/index.rst
@@ -36,6 +36,7 @@
 
     Configuration <configurations-ref>
     Connection types <connections/yandexcloud>
+    Lockbox Secret Backend 
<secrets-backends/yandex-cloud-lockbox-secret-backend>
     Operators <operators>
 
 .. toctree::
diff --git 
a/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
 
b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
new file mode 100644
index 0000000000..9403dad0ea
--- /dev/null
+++ 
b/docs/apache-airflow-providers-yandex/secrets-backends/yandex-cloud-lockbox-secret-backend.rst
@@ -0,0 +1,293 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Yandex.Cloud Lockbox Secret Backend
+===================================
+
+This topic describes how to configure Apache Airflow to use `Yandex Lockbox 
<https://cloud.yandex.com/en/docs/lockbox>`__
+as a secret backend and how to manage secrets.
+
+Before you begin
+----------------
+
+Before you start, make sure you have installed the ``yandex`` provider in your 
Apache Airflow installation:
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-yandex
+
+Enabling the Yandex Lockbox secret backend
+------------------------------------------
+
+To enable Yandex Lockbox as secrets backend,
+specify 
:py:class:`~airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend`
+as the ``backend`` in  ``[secrets]`` section of ``airflow.cfg`` file.
+
+Here is a sample configuration:
+
+.. code-block:: ini
+
+    [secrets]
+    backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+You can also set this with an environment variable:
+
+.. code-block:: bash
+
+    export 
AIRFLOW__SECRETS__BACKEND=airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+You can verify the correct setting of the configuration options by using the 
``airflow config get-value`` command:
+
+.. code-block:: console
+
+    $ airflow config get-value secrets backend
+    airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+
+Backend parameters
+------------------
+
+The next step is to configure backend parameters using the ``backend_kwargs`` 
options.
+You can pass the following parameters:
+
+* ``yc_oauth_token``: Specifies the user account OAuth token to connect to 
Yandex Lockbox with. Looks like ``y3_xxxxx``.
+* ``yc_sa_key_json``: Specifies the service account auth JSON. Looks like 
``{"id": "...", "service_account_id": "...", "private_key": "..."}``.
+* ``yc_sa_key_json_path``: Specifies the service account auth JSON file path. 
Looks like ``/home/airflow/authorized_key.json``. File content looks like 
``{"id": "...", "service_account_id": "...", "private_key": "..."}``.
+* ``yc_connection_id``: Specifies the connection ID to connect to Yandex 
Lockbox with. Default: "yandexcloud_default"
+* ``folder_id``: Specifies the folder ID to search for Yandex Lockbox secrets 
in. If set to None (null in JSON), requests will use the connection folder_id 
if specified.
+* ``connections_prefix``: Specifies the prefix of the secret to read to get 
Connections. If set to None (null in JSON), requests for connections will not 
be sent to Yandex Lockbox. Default: "airflow/connections"
+* ``variables_prefix``: Specifies the prefix of the secret to read to get 
Variables. If set to None (null in JSON), requests for variables will not be 
sent to Yandex Lockbox. Default: "airflow/variables"
+* ``config_prefix``: Specifies the prefix of the secret to read to get 
Configurations. If set to None (null in JSON), requests for variables will not 
be sent to Yandex Lockbox. Default: "airflow/config"
+* ``sep``: Specifies the separator used to concatenate secret_prefix and 
secret_id. Default: "/"
+* ``endpoint``: Specifies an API endpoint. Leave blank to use default.
+
+All options should be passed as a JSON dictionary.
+
+For example, if you want to set parameter ``connections_prefix`` to 
``"example-connections-prefix"``
+and parameter ``variables_prefix`` to ``"example-variables-prefix"``,
+your configuration file should look like this:
+
+.. code-block:: ini
+
+    [secrets]
+    backend = airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend
+    backend_kwargs = {"connections_prefix": "example-connections-prefix", 
"variables_prefix": "example-variables-prefix"}
+
+Set-up credentials
+------------------
+
+You need to specify credentials or id of yandexcloud connection to connect to 
Yandex Lockbox with.
+Credentials will be used with this priority:
+
+* OAuth Token
+* Service Account JSON file
+* Service Account JSON
+* Yandex Cloud Connection
+
+If no credentials specified, default connection id ``yandexcloud_default`` 
will be used.
+
+Using OAuth token for authorization as users account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create `OAuth token 
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/oauth-token>`__ 
for user account.
+It will looks like 
``y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc``.
+
+Then you need to specify the ``folder_id`` and token in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+    [secrets]
+    backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_oauth_token": 
"y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"}
+
+Using Authorized keys for authorization as service account
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Before you start, make sure you have `created 
<https://cloud.yandex.com/en/docs/iam/operations/sa/create>`__
+a Yandex Cloud `Service Account 
<https://cloud.yandex.com/en/docs/iam/concepts/users/service-accounts>`__
+with the permissions ``lockbox.viewer`` and ``lockbox.payloadViewer``.
+
+First, you need to create `Authorized key 
<https://cloud.yandex.com/en/docs/iam/concepts/authorization/key>`__
+for your service account and save the generated JSON file with public and 
private key parts.
+
+Then you need to specify the ``folder_id`` and key in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+    [secrets]
+    backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_sa_key_json": 
{"id": "...", "service_account_id": "...", "private_key": "..."}"}
+
+Alternatively, you can specify the path to JSON file in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+    [secrets]
+    backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", 
"yc_sa_key_json_path": "/home/airflow/authorized_key.json"}
+
+Using Yandex Cloud Connection for authorization
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+First, you need to create :ref:`Yandex Cloud Connection 
<yandex_cloud_connection>`.
+
+Then you need to specify the ``connection_id`` in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+    [secrets]
+    backend_kwargs = {"yc_connection_id": "my_yc_connection"}
+
+If no credentials specified, Lockbox Secret Backend will try to use default 
connection id ``yandexcloud_default``.
+
+Lockbox Secret Backend will try to use default folder id from Connection,
+also you can specify the ``folder_id`` in the ``backend_kwargs``:
+
+.. code-block:: ini
+
+    [secrets]
+    backend_kwargs = {"folder_id": "b1g66mft1vopnevbn57j", "yc_connection_id": 
"my_yc_connection"}
+
+Storing and Retrieving Connections
+----------------------------------
+
+To store a Connection, you need to `create secret 
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{connections_prefix}{sep}{connection_name}``
+and payload contains text value with any key.
+
+Storing a Connection as a URI
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The main way is to save connections as a :ref:`connection URI representation 
<generating_connection_uri>`.
+
+Example: 
``mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A``
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+    $ yc lockbox secret create \
+        --name airflow/connections/mysqldb \
+        --payload '[{"key": "value", "text_value": 
"mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A"}]'
+    done (1s)
+    name: airflow/connections/mysqldb
+
+Storing a Connection as a JSON
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Alternatively, you can save connections in JSON format:
+
+.. code-block:: json
+
+    {
+      "conn_type": "mysql",
+      "host": "myhost.com",
+      "login": "myname",
+      "password": "mypassword",
+      "extra": {
+        "this_param": "some val",
+        "that_param": "other val*"
+      }
+    }
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+    $ yc lockbox secret create \
+        --name airflow/connections/mysqldbjson \
+        --payload '[{"key": "value", "text_value": "{\"conn_type\": \"mysql\", 
\"host\": \"myhost.com\", \"login\": \"myname\", \"password\": \"mypassword\", 
\"extra\": {\"this_param\": \"some val\", \"that_param\": \"other val*\"}}"}]'
+    done (1s)
+    name: airflow/connections/mysqldbjson
+
+Retrieving Connection
+~~~~~~~~~~~~~~~~~~~~~
+
+To check the connection is correctly read from the Lockbox Secret Backend, you 
can use ``airflow connections get``:
+
+.. code-block:: console
+
+    $ airflow connections get mysqldb -o json
+    [{"id": null, "conn_id": "mysqldb", "conn_type": "mysql", "description": 
null, "host": "myhost.com", "schema": "", "login": "myname", "password": 
"mypassword", "port": null, "is_encrypted": "False", "is_extra_encrypted": 
"False", "extra_dejson": {"this_param": "some val", "that_param": "other 
val*"}, "get_uri": 
"mysql://myname:[email protected]/?this_param=some+val&that_param=other+val%2A"}]
+
+Storing and Retrieving Variables
+--------------------------------
+
+To store a Variable, you need to `create secret 
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{variables_prefix}{sep}{variable_name}``
+and payload contains text value with any key.
+
+This is an example variable value: ``some_secret_data``
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+    $ yc lockbox secret create \
+        --name airflow/variables/my_variable \
+        --payload '[{"key": "value", "text_value": "some_secret_data"}]'
+    done (1s)
+    name: airflow/variables/my_variable
+
+To check the variable is correctly read from the Lockbox Secret Backend, you 
can use ``airflow variables get``:
+
+.. code-block:: console
+
+    $ airflow variables get my_variable
+    some_secret_data
+
+Storing and Retrieving Configs
+------------------------------
+
+You can store some sensitive configs in the Lockbox Secret Backend.
+
+For example, we will provide a secret for ``sentry.sentry_dsn`` and use 
``sentry_dsn_value`` as the config value name.
+
+To store a Config, you need to `create secret 
<https://cloud.yandex.com/en/docs/lockbox/operations/secret-create>`__
+with name in format ``{config_prefix}{sep}{config_value_name}``
+and payload contains text value with any key.
+
+Here is an example of secret creation with the ``yc`` cli:
+
+.. code-block:: console
+
+    $ yc lockbox secret create \
+        --name airflow/config/sentry_dsn_value \
+        --payload '[{"key": "value", "text_value": 
"https://[email protected]/1"}]'
+    done (1s)
+    name: airflow/config/sentry_dsn_value
+
+Then, we need to specify the config value name as ``{key}_secret`` in the 
Apache Airflow configuration:
+
+.. code-block:: ini
+
+    [sentry]
+    sentry_dsn_secret = sentry_dsn_value
+
+To check the config value is correctly read from the Lockbox Secret Backend, 
you can use ``airflow config get-value``:
+
+.. code-block:: console
+
+    $ airflow config get-value sentry sentry_dsn
+    https://[email protected]/1
+
+Clean up
+--------
+
+You can easily delete your secret with the ``yc`` cli:
+
+.. code-block:: console
+
+    $ yc lockbox secret delete --name airflow/connections/mysqldb
+    name: airflow/connections/mysqldb
diff --git a/tests/providers/yandex/hooks/test_yandex.py 
b/tests/providers/yandex/hooks/test_yandex.py
index 23b460dabf..1ba8c800c1 100644
--- a/tests/providers/yandex/hooks/test_yandex.py
+++ b/tests/providers/yandex/hooks/test_yandex.py
@@ -19,32 +19,30 @@ from __future__ import annotations
 
 import os
 from unittest import mock
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
 
 import pytest
 
-from airflow.exceptions import AirflowException
 from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
 from tests.test_utils.config import conf_vars
 
 
 class TestYandexHook:
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
-    def test_client_created_without_exceptions(self, get_credentials_mock, 
get_connection_mock):
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+    def test_client_created_without_exceptions(self, mock_get_credentials, 
mock_get_connection):
         """tests `init` method to validate client creation when all parameters 
are passed"""
 
-        # Inputs to constructor
         default_folder_id = "test_id"
         default_public_ssh_key = "test_key"
 
         extra_dejson = '{"extras": "extra"}'
-        get_connection_mock["extra_dejson"] = "sdsd"
-        get_connection_mock.extra_dejson = '{"extras": "extra"}'
-        get_connection_mock.return_value = mock.Mock(
+        mock_get_connection["extra_dejson"] = "sdsd"
+        mock_get_connection.extra_dejson = '{"extras": "extra"}'
+        mock_get_connection.return_value = mock.Mock(
             connection_id="yandexcloud_default", extra_dejson=extra_dejson
         )
-        get_credentials_mock.return_value = {"token": 122323}
+        mock_get_credentials.return_value = {"token": 122323}
 
         hook = YandexCloudBaseHook(
             yandex_conn_id=None,
@@ -54,41 +52,54 @@ class TestYandexHook:
         assert hook.client is not None
 
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    def test_get_credentials_raise_exception(self, get_connection_mock):
-        """tests 'get_credentials' method raising exception if none of the 
required fields are passed."""
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+    def test_provider_user_agent(self, mock_get_credentials, 
mock_get_connection):
+        mock_get_connection.return_value = 
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+        mock_get_credentials.return_value = {"token": 122323}
+        sdk_prefix = "MyAirflow"
 
-        # Inputs to constructor
-        default_folder_id = "test_id"
-        default_public_ssh_key = "test_key"
+        with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
+            hook = YandexCloudBaseHook()
+            assert hook.provider_user_agent().startswith(sdk_prefix)
 
-        extra_dejson = '{"extras": "extra"}'
-        get_connection_mock["extra_dejson"] = "sdsd"
-        get_connection_mock.extra_dejson = '{"extras": "extra"}'
-        get_connection_mock.return_value = mock.Mock(
-            connection_id="yandexcloud_default", extra_dejson=extra_dejson
-        )
+    @mock.patch("airflow.hooks.base.BaseHook.get_connection")
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+    def test_sdk_user_agent(self, mock_get_credentials, mock_get_connection):
+        mock_get_connection.return_value = 
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+        mock_get_credentials.return_value = {"token": 122323}
+        sdk_prefix = "MyAirflow"
+
+        with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
+            hook = YandexCloudBaseHook()
+            assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix)
 
-        with pytest.raises(AirflowException):
-            YandexCloudBaseHook(
-                yandex_conn_id=None,
-                default_folder_id=default_folder_id,
-                default_public_ssh_key=default_public_ssh_key,
-            )
+    @pytest.mark.parametrize(
+        "uri",
+        [
+            pytest.param(
+                
"a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc",
 id="prefix"
+            ),
+            pytest.param("a://?folder_id=abc&public_ssh_key=abc", 
id="no-prefix"),
+        ],
+    )
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials", 
new=MagicMock())
+    def test_backcompat_prefix_works(self, uri):
+        with mock.patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
+            hook = YandexCloudBaseHook("my_conn")
+            assert hook.default_folder_id == "abc"
+            assert hook.default_public_ssh_key == "abc"
 
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
-    def test_get_field(self, get_credentials_mock, get_connection_mock):
-        # Inputs to constructor
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+    def test_get_endpoint_specified(self, mock_get_credentials, 
mock_get_connection):
         default_folder_id = "test_id"
         default_public_ssh_key = "test_key"
 
-        extra_dejson = {"one": "value_one"}
-        get_connection_mock["extra_dejson"] = "sdsd"
-        get_connection_mock.extra_dejson = '{"extras": "extra"}'
-        get_connection_mock.return_value = mock.Mock(
+        extra_dejson = {"endpoint": "my_endpoint", "something_else": 
"some_value"}
+        mock_get_connection.return_value = mock.Mock(
             connection_id="yandexcloud_default", extra_dejson=extra_dejson
         )
-        get_credentials_mock.return_value = {"token": 122323}
+        mock_get_credentials.return_value = {"token": 122323}
 
         hook = YandexCloudBaseHook(
             yandex_conn_id=None,
@@ -96,20 +107,19 @@ class TestYandexHook:
             default_public_ssh_key=default_public_ssh_key,
         )
 
-        assert hook._get_field("one") == "value_one"
+        assert hook._get_endpoint() == {"endpoint": "my_endpoint"}
 
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
-    def test_get_endpoint_specified(self, get_credentials_mock, 
get_connection_mock):
-        # Inputs to constructor
+    @mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
+    def test_get_endpoint_unspecified(self, mock_get_credentials, 
mock_get_connection):
         default_folder_id = "test_id"
         default_public_ssh_key = "test_key"
 
-        extra_dejson = {"endpoint": "my_endpoint", "something_else": 
"some_value"}
-        get_connection_mock.return_value = mock.Mock(
+        extra_dejson = {"something_else": "some_value"}
+        mock_get_connection.return_value = mock.Mock(
             connection_id="yandexcloud_default", extra_dejson=extra_dejson
         )
-        get_credentials_mock.return_value = {"token": 122323}
+        mock_get_credentials.return_value = {"token": 122323}
 
         hook = YandexCloudBaseHook(
             yandex_conn_id=None,
@@ -117,52 +127,50 @@ class TestYandexHook:
             default_public_ssh_key=default_public_ssh_key,
         )
 
-        assert hook._get_endpoint() == {"endpoint": "my_endpoint"}
+        assert hook._get_endpoint() == {}
 
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
-    def test_get_endpoint_unspecified(self, get_credentials_mock, 
get_connection_mock):
-        # Inputs to constructor
+    def test__get_field(self, mock_get_connection):
+        field_name = "one"
+        field_value = "value_one"
         default_folder_id = "test_id"
         default_public_ssh_key = "test_key"
+        extra_dejson = {field_name: field_value}
 
-        extra_dejson = {"something_else": "some_value"}
-        get_connection_mock.return_value = mock.Mock(
+        mock_get_connection["extra_dejson"] = "sdsd"
+        mock_get_connection.extra_dejson = '{"extras": "extra"}'
+        mock_get_connection.return_value = mock.Mock(
             connection_id="yandexcloud_default", extra_dejson=extra_dejson
         )
-        get_credentials_mock.return_value = {"token": 122323}
 
         hook = YandexCloudBaseHook(
             yandex_conn_id=None,
             default_folder_id=default_folder_id,
             default_public_ssh_key=default_public_ssh_key,
         )
+        res = hook._get_field(
+            field_name=field_name,
+        )
 
-        assert hook._get_endpoint() == {}
+        assert res == field_value
 
     @mock.patch("airflow.hooks.base.BaseHook.get_connection")
-    
@mock.patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
-    def test_sdk_user_agent(self, get_credentials_mock, get_connection_mock):
-        get_connection_mock.return_value = 
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
-        get_credentials_mock.return_value = {"token": 122323}
-        sdk_prefix = "MyAirflow"
+    def test__get_field_extras_not_found(self, get_connection_mock):
+        field_name = "some_field"
+        default = "some_default"
+        extra_dejson = '{"extras": "extra"}'
 
-        with conf_vars({("yandex", "sdk_user_agent_prefix"): sdk_prefix}):
-            hook = YandexCloudBaseHook()
-            assert hook.sdk._channels._client_user_agent.startswith(sdk_prefix)
+        get_connection_mock["extra_dejson"] = "sdsd"
+        get_connection_mock.extra_dejson = '{"extras": "extra"}'
+        get_connection_mock.return_value = mock.Mock(
+            connection_id="yandexcloud_default", extra_dejson=extra_dejson
+        )
 
-    @pytest.mark.parametrize(
-        "uri",
-        [
-            pytest.param(
-                
"a://?extra__yandexcloud__folder_id=abc&extra__yandexcloud__public_ssh_key=abc",
 id="prefix"
-            ),
-            pytest.param("a://?folder_id=abc&public_ssh_key=abc", 
id="no-prefix"),
-        ],
-    )
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials",
 new=MagicMock())
-    def test_backcompat_prefix_works(self, uri):
-        with patch.dict(os.environ, {"AIRFLOW_CONN_MY_CONN": uri}):
-            hook = YandexCloudBaseHook("my_conn")
-            assert hook.default_folder_id == "abc"
-            assert hook.default_public_ssh_key == "abc"
+        hook = YandexCloudBaseHook()
+        delattr(hook, "extras")
+        res = hook._get_field(
+            field_name=field_name,
+            default=default,
+        )
+
+        assert res == default
diff --git a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py 
b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
index cf436a9b8c..71b94dcd57 100644
--- a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
@@ -17,19 +17,10 @@
 from __future__ import annotations
 
 import json
-from unittest.mock import patch
-
-import pytest
+from unittest import mock
 
 from airflow.models import Connection
-
-try:
-    import yandexcloud
-
-    from airflow.providers.yandex.hooks.yandexcloud_dataproc import 
DataprocHook
-except ImportError:
-    yandexcloud = None
-
+from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook
 
 # Airflow connection with type "yandexcloud" must be created
 CONNECTION_ID = "yandexcloud_default"
@@ -61,23 +52,22 @@ SSH_PUBLIC_KEYS = [
     
"cFDe6faKCxH6iDRteo4D8L8BxwzN42uZSB0nfmjkIxFTcEU3mFSXEbWByg78aoddMrAAjatyrhH1pON6P0="
 ]
 
-# If Yandex.Cloud credentials are set than full test will be run. Otherwise 
only mocked tests.
+# If Yandex.Cloud credentials are set than full test will be run. Otherwise, 
only mocked tests.
 HAS_CREDENTIALS = OAUTH_TOKEN != "my_oauth_token"
 
 
[email protected](yandexcloud is None, reason="Skipping Yandex.Cloud hook 
test: no yandexcloud module")
 class TestYandexCloudDataprocHook:
     def _init_hook(self):
-        with patch("airflow.hooks.base.BaseHook.get_connection") as 
get_connection_mock:
-            get_connection_mock.return_value = self.connection
+        with mock.patch("airflow.hooks.base.BaseHook.get_connection") as 
mock_get_connection:
+            mock_get_connection.return_value = self.connection
             self.hook = DataprocHook()
 
     def setup_method(self):
         self.connection = Connection(extra=json.dumps({"oauth": OAUTH_TOKEN}))
         self._init_hook()
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_create_dataproc_cluster_mocked(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_create_dataproc_cluster_mocked(self, mock_create_operation):
         self._init_hook()
 
         self.hook.client.create_cluster(
@@ -90,16 +80,16 @@ class TestYandexCloudDataprocHook:
             cluster_image_version=CLUSTER_IMAGE_VERSION,
             service_account_id=SERVICE_ACCOUNT_ID,
         )
-        assert create_operation_mock.called
+        assert mock_create_operation.called
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_delete_dataproc_cluster_mocked(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_delete_dataproc_cluster_mocked(self, mock_create_operation):
         self._init_hook()
         self.hook.client.delete_cluster("my_cluster_id")
-        assert create_operation_mock.called
+        assert mock_create_operation.called
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_create_hive_job_hook(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_create_hive_job_hook(self, mock_create_operation):
         self._init_hook()
 
         self.hook.client.create_hive_job(
@@ -110,10 +100,10 @@ class TestYandexCloudDataprocHook:
             query="SELECT 1;",
             script_variables=None,
         )
-        assert create_operation_mock.called
+        assert mock_create_operation.called
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_create_mapreduce_job_hook(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_create_mapreduce_job_hook(self, mock_create_operation):
         self._init_hook()
 
         self.hook.client.create_mapreduce_job(
@@ -145,10 +135,10 @@ class TestYandexCloudDataprocHook:
                 "mapreduce.job.maps": "6",
             },
         )
-        assert create_operation_mock.called
+        assert mock_create_operation.called
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_create_spark_job_hook(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_create_spark_job_hook(self, mock_create_operation):
         self._init_hook()
 
         self.hook.client.create_spark_job(
@@ -170,10 +160,10 @@ class TestYandexCloudDataprocHook:
             name="Spark job",
             properties={"spark.submit.deployMode": "cluster"},
         )
-        assert create_operation_mock.called
+        assert mock_create_operation.called
 
-    @patch("yandexcloud.SDK.create_operation_and_get_result")
-    def test_create_pyspark_job_hook(self, create_operation_mock):
+    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
+    def test_create_pyspark_job_hook(self, mock_create_operation):
         self._init_hook()
 
         self.hook.client.create_pyspark_job(
@@ -194,4 +184,4 @@ class TestYandexCloudDataprocHook:
             properties={"spark.submit.deployMode": "cluster"},
             
python_file_uris=["s3a://some-in-bucket/jobs/sources/pyspark-001/geonames.py"],
         )
-        assert create_operation_mock.called
+        assert mock_create_operation.called
diff --git a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py 
b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
index 879645daf6..083e3d538a 100644
--- a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
@@ -76,10 +76,10 @@ class TestDataprocClusterCreateOperator:
             schedule="@daily",
         )
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.create_cluster")
-    def test_create_cluster(self, create_cluster_mock, *_):
+    def test_create_cluster(self, mock_create_cluster, *_):
         operator = DataprocCreateClusterOperator(
             task_id="create_cluster",
             ssh_public_keys=SSH_PUBLIC_KEYS,
@@ -93,7 +93,7 @@ class TestDataprocClusterCreateOperator:
         )
         context = {"task_instance": MagicMock()}
         operator.execute(context)
-        create_cluster_mock.assert_called_once_with(
+        mock_create_cluster.assert_called_once_with(
             cluster_description="",
             cluster_image_version="1.4",
             cluster_name=None,
@@ -135,15 +135,15 @@ class TestDataprocClusterCreateOperator:
         )
         context["task_instance"].xcom_push.assert_has_calls(
             [
-                call(key="cluster_id", 
value=create_cluster_mock().response.id),
+                call(key="cluster_id", 
value=mock_create_cluster().response.id),
                 call(key="yandexcloud_connection_id", value=CONNECTION_ID),
             ]
         )
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.delete_cluster")
-    def test_delete_cluster_operator(self, delete_cluster_mock, *_):
+    def test_delete_cluster_operator(self, mock_delete_cluster, *_):
         operator = DataprocDeleteClusterOperator(
             task_id="delete_cluster",
             connection_id=CONNECTION_ID,
@@ -152,12 +152,12 @@ class TestDataprocClusterCreateOperator:
         context["task_instance"].xcom_pull.return_value = "my_cluster_id"
         operator.execute(context)
         
context["task_instance"].xcom_pull.assert_called_once_with(key="cluster_id")
-        delete_cluster_mock.assert_called_once_with("my_cluster_id")
+        mock_delete_cluster.assert_called_once_with("my_cluster_id")
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.create_hive_job")
-    def test_create_hive_job_operator(self, create_hive_job_mock, *_):
+    def test_create_hive_job_operator(self, mock_create_hive_job, *_):
         operator = DataprocCreateHiveJobOperator(
             task_id="create_hive_job",
             query="SELECT 1;",
@@ -173,7 +173,7 @@ class TestDataprocClusterCreateOperator:
             ]
         )
 
-        create_hive_job_mock.assert_called_once_with(
+        mock_create_hive_job.assert_called_once_with(
             cluster_id="my_cluster_id",
             continue_on_failure=False,
             name="Hive job",
@@ -183,10 +183,10 @@ class TestDataprocClusterCreateOperator:
             script_variables=None,
         )
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.create_mapreduce_job")
-    def test_create_mapreduce_job_operator(self, create_mapreduce_job_mock, 
*_):
+    def test_create_mapreduce_job_operator(self, mock_create_mapreduce_job, 
*_):
         operator = DataprocCreateMapReduceJobOperator(
             task_id="run_mapreduce_job",
             main_class="org.apache.hadoop.streaming.HadoopStreaming",
@@ -223,7 +223,7 @@ class TestDataprocClusterCreateOperator:
             ]
         )
 
-        create_mapreduce_job_mock.assert_called_once_with(
+        mock_create_mapreduce_job.assert_called_once_with(
             archive_uris=None,
             args=[
                 "-mapper",
@@ -253,10 +253,10 @@ class TestDataprocClusterCreateOperator:
             },
         )
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.create_spark_job")
-    def test_create_spark_job_operator(self, create_spark_job_mock, *_):
+    def test_create_spark_job_operator(self, mock_create_spark_job, *_):
         operator = DataprocCreateSparkJobOperator(
             task_id="create_spark_job",
             
main_jar_file_uri="s3a://data-proc-public/jobs/sources/java/dataproc-examples-1.0.jar",
@@ -292,7 +292,7 @@ class TestDataprocClusterCreateOperator:
             ]
         )
 
-        create_spark_job_mock.assert_called_once_with(
+        mock_create_spark_job.assert_called_once_with(
             
archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"],
             args=[
                 "s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2",
@@ -315,10 +315,10 @@ class TestDataprocClusterCreateOperator:
             exclude_packages=None,
         )
 
-    
@patch("airflow.providers.yandex.hooks.yandex.YandexCloudBaseHook._get_credentials")
+    @patch("airflow.providers.yandex.utils.credentials.get_credentials")
     @patch("airflow.hooks.base.BaseHook.get_connection")
     @patch("yandexcloud._wrappers.dataproc.Dataproc.create_pyspark_job")
-    def test_create_pyspark_job_operator(self, create_pyspark_job_mock, *_):
+    def test_create_pyspark_job_operator(self, mock_create_pyspark_job, *_):
         operator = DataprocCreatePysparkJobOperator(
             task_id="create_pyspark_job",
             
main_python_file_uri="s3a://some-in-bucket/jobs/sources/pyspark-001/main.py",
@@ -355,7 +355,7 @@ class TestDataprocClusterCreateOperator:
             ]
         )
 
-        create_pyspark_job_mock.assert_called_once_with(
+        mock_create_pyspark_job.assert_called_once_with(
             
archive_uris=["s3a://some-in-bucket/jobs/sources/data/country-codes.csv.zip"],
             args=[
                 "s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2",
diff --git a/tests/providers/yandex/secrets/__init__.py 
b/tests/providers/yandex/secrets/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/secrets/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/secrets/test_lockbox.py 
b/tests/providers/yandex/secrets/test_lockbox.py
new file mode 100644
index 0000000000..e51724f866
--- /dev/null
+++ b/tests/providers/yandex/secrets/test_lockbox.py
@@ -0,0 +1,435 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest.mock import MagicMock, Mock, patch
+
+import yandex.cloud.lockbox.v1.payload_pb2 as payload_pb
+import yandex.cloud.lockbox.v1.secret_pb2 as secret_pb
+import yandex.cloud.lockbox.v1.secret_service_pb2 as secret_service_pb
+
+from airflow.providers.yandex.secrets.lockbox import LockboxSecretBackend
+from airflow.providers.yandex.utils.defaults import default_conn_name
+
+
+class TestLockboxSecretBackend:
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_connection(self, 
mock_get_value):
+        conn_id = "fake_conn"
+        conn_type = "scheme"
+        host = "host"
+        login = "user"
+        password = "pass"
+        port = 100
+        uri = f"{conn_type}://{login}:{password}@{host}:{port}"
+
+        mock_get_value.return_value = uri
+
+        conn = LockboxSecretBackend().get_connection(conn_id)
+
+        assert conn.conn_id == conn_id
+        assert conn.conn_type == conn_type
+        assert conn.host == host
+        assert conn.schema == ""
+        assert conn.login == login
+        assert conn.password == password
+        assert conn.port == port
+        assert conn.get_uri() == uri
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_connection_from_json(self, 
mock_get_value):
+        conn_id = "airflow_to_yandexcloud"
+        conn_type = "yandex_cloud"
+        extra = "some extra values"
+        c = {
+            "conn_type": conn_type,
+            "extra": extra,
+        }
+
+        mock_get_value.return_value = json.dumps(c)
+
+        conn = LockboxSecretBackend().get_connection(conn_id)
+
+        assert conn.conn_id == conn_id
+        assert conn.conn_type == conn_type
+        assert conn.extra == extra
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_variable(self, mock_get_value):
+        k = "thisiskey"
+        v = "thisisvalue"
+
+        mock_get_value.return_value = v
+
+        value = LockboxSecretBackend().get_variable(k)
+
+        assert value == v
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_config(self, mock_get_value):
+        k = "thisiskey"
+        v = "thisisvalue"
+
+        mock_get_value.return_value = v
+
+        value = LockboxSecretBackend().get_config(k)
+
+        assert value == v
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_connection_prefix_is_none(self, 
mock_get_value):
+        uri = "scheme://user:pass@host:100"
+
+        mock_get_value.return_value = uri
+
+        conn = LockboxSecretBackend(
+            connections_prefix=None,
+        ).get_connection("fake_conn")
+
+        assert conn is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def 
test_yandex_lockbox_secret_backend_get_connection_with_oauth_token_auth(self, 
mock_get_value):
+        conn_id = "yandex_cloud"
+        uri = "scheme://user:pass@host:100"
+
+        mock_get_value.return_value = uri
+
+        conn = LockboxSecretBackend(
+            
yc_oauth_token="y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc",
+        ).get_connection(conn_id)
+
+        assert conn.conn_id == conn_id
+        assert conn.get_uri() == uri
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def 
test_yandex_lockbox_secret_backend_get_connection_conn_id_for_backend(self, 
mock_get_value):
+        conn_id = "yandex_cloud"
+        uri = "scheme://user:pass@host:100"
+
+        mock_get_value.return_value = uri
+
+        conn = LockboxSecretBackend(
+            yc_connection_id=conn_id,
+        ).get_connection(conn_id)
+
+        assert conn is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def 
test_yandex_lockbox_secret_backend_get_connection_default_conn_id(self, 
mock_get_value):
+        conn_id = default_conn_name
+        uri = "scheme://user:pass@host:100"
+
+        mock_get_value.return_value = uri
+
+        conn = LockboxSecretBackend().get_connection(conn_id)
+
+        assert conn is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_variable_prefix_is_none(self, 
mock_get_value):
+        k = "thisiskey"
+        v = "thisisvalue"
+
+        mock_get_value.return_value = v
+
+        value = LockboxSecretBackend(
+            variables_prefix=None,
+        ).get_variable(k)
+
+        assert value is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secret_value")
+    def test_yandex_lockbox_secret_backend_get_config_prefix_is_none(self, 
mock_get_value):
+        k = "thisiskey"
+        v = "thisisvalue"
+
+        mock_get_value.return_value = v
+
+        value = LockboxSecretBackend(
+            config_prefix=None,
+        ).get_config(k)
+
+        assert value is None
+
+    def 
test_yandex_lockbox_secret_backend__client_created_without_exceptions(self):
+        yc_oauth_token = 
"y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"
+
+        sm = LockboxSecretBackend(
+            yc_oauth_token=yc_oauth_token,
+        )
+
+        assert sm._client is not None
+
+    def test_yandex_lockbox_secret_backedn__get_endpoint(self):
+        endpoint = "api.cloud.yandex.net"
+        expected = {
+            "endpoint": endpoint,
+        }
+
+        res = LockboxSecretBackend(
+            endpoint=endpoint,
+        )._get_endpoint()
+
+        assert res == expected
+
+    def test_yandex_lockbox_secret_backedn__get_endpoint_not_specified(self):
+        expected = {}
+
+        res = LockboxSecretBackend()._get_endpoint()
+
+        assert res == expected
+
+    def test_yandex_lockbox_secret_backend__build_secret_name(self):
+        prefix = "thiisprefix"
+        key = "thisiskey"
+        expected = "thiisprefix/thisiskey"
+
+        res = LockboxSecretBackend()._build_secret_name(prefix, key)
+
+        assert res == expected
+
+    def test_yandex_lockbox_secret_backend__build_secret_name_no_prefix(self):
+        prefix = ""
+        key = "thisiskey"
+        expected = "thisiskey"
+
+        res = LockboxSecretBackend()._build_secret_name(prefix, key)
+
+        assert res == expected
+
+    def test_yandex_lockbox_secret_backend__build_secret_name_custom_sep(self):
+        sep = "_"
+        prefix = "thiisprefix"
+        key = "thisiskey"
+        expected = "thiisprefix_thisiskey"
+
+        res = LockboxSecretBackend(
+            sep=sep,
+        )._build_secret_name(prefix, key)
+
+        assert res == expected
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+    def test_yandex_lockbox_secret_backend__get_secret_value(self, 
mock_get_payload, mock_get_secrets):
+        target_name = "target_name"
+        target_text = "target_text"
+
+        mock_get_secrets.return_value = [
+            secret_pb.Secret(
+                id="123",
+                name="one",
+            ),
+            secret_pb.Secret(
+                id="456",
+                name=target_name,
+            ),
+            secret_pb.Secret(
+                id="789",
+                name="two",
+            ),
+        ]
+        mock_get_payload.return_value = payload_pb.Payload(
+            entries=[
+                payload_pb.Payload.Entry(text_value=target_text),
+            ],
+        )
+
+        res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+        assert res == target_text
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+    def test_yandex_lockbox_secret_backend__get_secret_value_not_found(
+        self, mock_get_payload, mock_get_secrets
+    ):
+        target_name = "target_name"
+        target_text = "target_text"
+
+        mock_get_secrets.return_value = [
+            secret_pb.Secret(
+                id="123",
+                name="one",
+            ),
+            secret_pb.Secret(
+                id="789",
+                name="two",
+            ),
+        ]
+        mock_get_payload.return_value = payload_pb.Payload(
+            entries=[
+                payload_pb.Payload.Entry(text_value=target_text),
+            ],
+        )
+
+        res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+        assert res is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_secrets")
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._get_payload")
+    def test_yandex_lockbox_secret_backend__get_secret_value_no_text_entries(
+        self, mock_get_payload, mock_get_secrets
+    ):
+        target_name = "target_name"
+        target_value = b"01010101"
+
+        mock_get_secrets.return_value = [
+            secret_pb.Secret(
+                id="123",
+                name="one",
+            ),
+            secret_pb.Secret(
+                id="456",
+                name="two",
+            ),
+            secret_pb.Secret(
+                id="789",
+                name=target_name,
+            ),
+        ]
+        mock_get_payload.return_value = payload_pb.Payload(
+            entries=[
+                payload_pb.Payload.Entry(binary_value=target_value),
+            ],
+        )
+
+        res = LockboxSecretBackend()._get_secret_value("", target_name)
+
+        assert res is None
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets")
+    def test_yandex_lockbox_secret_backend__get_secrets(self, 
mock_list_secrets):
+        secrets = secret_service_pb.ListSecretsResponse(
+            secrets=[
+                secret_pb.Secret(
+                    id="123",
+                ),
+                secret_pb.Secret(
+                    id="456",
+                ),
+            ],
+        )
+
+        mock_list_secrets.return_value = secrets
+
+        res = LockboxSecretBackend(
+            folder_id="someid",
+        )._get_secrets()
+
+        assert res == secrets.secrets
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._list_secrets")
+    def test_yandex_lockbox_secret_backend__get_secrets_page_token(self, 
mock_list_secrets):
+        first_secrets = secret_service_pb.ListSecretsResponse(
+            secrets=[
+                secret_pb.Secret(
+                    id="123",
+                ),
+                secret_pb.Secret(
+                    id="456",
+                ),
+            ],
+            next_page_token="token",
+        )
+        second_secrets = secret_service_pb.ListSecretsResponse(
+            secrets=[
+                secret_pb.Secret(
+                    id="789",
+                ),
+                secret_pb.Secret(
+                    id="000",
+                ),
+            ],
+            next_page_token="",
+        )
+
+        mock_list_secrets.side_effect = [
+            first_secrets,
+            second_secrets,
+        ]
+
+        res = LockboxSecretBackend(
+            folder_id="someid",
+        )._get_secrets()
+
+        assert res == [*first_secrets.secrets, *second_secrets.secrets]
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client")
+    def test_yandex_lockbox_secret_backend__get_payload(self, mock_client):
+        mock_stub = MagicMock()
+        mock_response = payload_pb.Payload()
+        mock_stub.Get.return_value = mock_response
+        mock_client.return_value = mock_stub
+
+        result = LockboxSecretBackend()._get_payload(
+            secret_id="test_secret",
+            version_id="test_version",
+        )
+
+        mock_client.assert_called_once()
+        mock_stub.Get.assert_called_once()
+        assert result == mock_response
+
+    
@patch("airflow.providers.yandex.secrets.lockbox.LockboxSecretBackend._client")
+    def test_yandex_lockbox_secret_backend__list_secrets(self, mock_client):
+        mock_stub = MagicMock()
+        mock_response = secret_service_pb.ListSecretsResponse()
+        mock_stub.List.return_value = mock_response
+        mock_client.return_value = mock_stub
+
+        result = LockboxSecretBackend()._list_secrets(
+            folder_id="test_folder",
+        )
+
+        mock_client.assert_called_once()
+        mock_stub.List.assert_called_once()
+        assert result == mock_response
+
+    def test_yandex_lockbox_secret_backend_folder_id(self):
+        folder_id = "id1"
+
+        res = LockboxSecretBackend(
+            folder_id=folder_id,
+        ).folder_id
+
+        assert res == folder_id
+
+    @patch("airflow.models.connection.Connection.get_connection_from_secrets")
+    def test_yandex_lockbox_secret_backend_folder_id_from_connection(self, 
mock_get_connection):
+        folder_id = "id1"
+
+        mock_get_connection.return_value = Mock(
+            connection_id=default_conn_name,
+            extra_dejson={"folder_id": folder_id},
+        )
+
+        sm = LockboxSecretBackend()
+        _ = sm._client
+        res = sm.folder_id
+
+        assert res == folder_id
+
+    def 
test_yandex_lockbox_secret_backend__get_field_connection_not_specified(self):
+        sm = LockboxSecretBackend()
+        sm.yc_connection_id = None
+        res = sm._get_field("somefield")
+
+        assert res is None
diff --git a/tests/providers/yandex/utils/__init__.py 
b/tests/providers/yandex/utils/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/utils/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/utils/test_credentials.py 
b/tests/providers/yandex/utils/test_credentials.py
new file mode 100644
index 0000000000..5bc1e17490
--- /dev/null
+++ b/tests/providers/yandex/utils/test_credentials.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+from unittest import mock
+
+from airflow.providers.yandex.utils.credentials import (
+    get_credentials,
+    get_service_account_id,
+    get_service_account_key,
+)
+
+
+def test_get_credentials_oauth_token():
+    oauth_token = "y3_Vdheub7w9bIut67GHeL345gfb5GAnd3dZnf08FRbvjeUFvetYiohGvc"
+    service_account_key = {
+        "id": "...",
+        "service_account_id": "...",
+        "private_key": "...",
+    }
+    service_account_key_json = json.dumps(service_account_key)
+    service_account_file_path = "/home/airflow/authorized_key.json"
+    expected = {"token": oauth_token}
+
+    res = get_credentials(
+        oauth_token=oauth_token,
+        service_account_json=service_account_key_json,
+        service_account_json_path=service_account_file_path,
+    )
+
+    assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_credentials_service_account_key(mock_get_service_account_key):
+    service_account_key = {
+        "id": "...",
+        "service_account_id": "...",
+        "private_key": "...",
+    }
+    service_account_key_json = json.dumps(service_account_key)
+    service_account_file_path = "/home/airflow/authorized_key.json"
+    expected = {"service_account_key": service_account_key}
+
+    mock_get_service_account_key.return_value = service_account_key
+
+    res = get_credentials(
+        service_account_json=service_account_key_json,
+        service_account_json_path=service_account_file_path,
+    )
+
+    assert res == expected
+
+
+def test_get_credentials_metadata_service(caplog):
+    expected = {}
+
+    res = get_credentials()
+
+    assert res == expected
+    assert "using metadata service as credentials" in caplog.text
+
+
+def test_get_service_account_key():
+    service_account_key = {
+        "id": "...",
+        "service_account_id": "...",
+        "private_key": "...",
+    }
+    service_account_key_json = json.dumps(service_account_key)
+    expected = service_account_key
+
+    res = get_service_account_key(
+        service_account_json=service_account_key_json,
+    )
+
+    assert res == expected
+
+
+def test_get_service_account_dict():
+    service_account_key = {
+        "id": "...",
+        "service_account_id": "...",
+        "private_key": "...",
+    }
+    expected = service_account_key
+
+    res = get_service_account_key(
+        service_account_json=service_account_key,
+    )
+
+    assert res == expected
+
+
+def test_get_service_account_key_file(tmp_path):
+    service_account_key = {
+        "id": "...",
+        "service_account_id": "...",
+        "private_key": "...",
+    }
+    service_account_key_json = json.dumps(service_account_key)
+    service_account_file = tmp_path / "authorized_key.json"
+    service_account_file.write_text(service_account_key_json)
+    service_account_file_path = str(service_account_file)
+    expected = service_account_key
+
+    res = get_service_account_key(
+        service_account_json=service_account_key_json,
+        service_account_json_path=service_account_file_path,
+    )
+
+    assert res == expected
+
+
+def test_get_service_account_key_none():
+    expected = None
+
+    res = get_service_account_key()
+
+    assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_service_account_id(mock_get_service_account_key):
+    service_account_id = "this_is_service_account_id"
+    service_account_key = {
+        "id": "...",
+        "service_account_id": service_account_id,
+        "private_key": "...",
+    }
+    service_account_key_json = json.dumps(service_account_key)
+    service_account_file_path = "/home/airflow/authorized_key.json"
+    expected = service_account_id
+
+    mock_get_service_account_key.return_value = service_account_key
+
+    res = get_service_account_id(
+        service_account_json=service_account_key_json,
+        service_account_json_path=service_account_file_path,
+    )
+
+    assert res == expected
+
+
[email protected]("airflow.providers.yandex.utils.credentials.get_service_account_key")
+def test_get_service_account_id_none(mock_get_service_account_key):
+    expected = None
+
+    mock_get_service_account_key.return_value = None
+
+    res = get_service_account_id()
+
+    assert res == expected
diff --git a/tests/providers/yandex/utils/test_defaults.py 
b/tests/providers/yandex/utils/test_defaults.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/yandex/utils/test_defaults.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/yandex/utils/test_fields.py 
b/tests/providers/yandex/utils/test_fields.py
new file mode 100644
index 0000000000..3b42282324
--- /dev/null
+++ b/tests/providers/yandex/utils/test_fields.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.yandex.utils.fields import get_field_from_extras
+
+
+def test_get_field_from_extras():
+    field_name = "somefield"
+    default = None
+    expected = "somevalue"
+    extras = {
+        field_name: expected,
+    }
+
+    res = get_field_from_extras(
+        extras=extras,
+        field_name=field_name,
+        default=default,
+    )
+
+    assert res == expected
+
+
+def test_get_field_from_extras_not_found():
+    field_name = "somefield"
+    default = "default"
+    expected = default
+    extras = {}
+
+    res = get_field_from_extras(
+        extras=extras,
+        field_name=field_name,
+        default=default,
+    )
+
+    assert res == expected
+
+
+def test_get_field_from_extras_prefixed_in_extra():
+    field_name = "somefield"
+    default = None
+    expected = "somevalue"
+    extras = {
+        f"extra__yandexcloud__{field_name}": expected,
+    }
+
+    res = get_field_from_extras(
+        extras=extras,
+        field_name=field_name,
+        default=default,
+    )
+
+    assert res == expected
+
+
+def test_get_field_from_extras_field_name_with_extra_raise_exception():
+    field_name = "extra__yandexcloud__fieldname"
+    default = None
+    extras = {}
+
+    with pytest.raises(ValueError):
+        get_field_from_extras(
+            extras=extras,
+            field_name=field_name,
+            default=default,
+        )
diff --git a/tests/providers/yandex/utils/test_user_agent.py 
b/tests/providers/yandex/utils/test_user_agent.py
new file mode 100644
index 0000000000..854549e391
--- /dev/null
+++ b/tests/providers/yandex/utils/test_user_agent.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+from airflow.providers.yandex.utils.user_agent import provider_user_agent
+
+
+def test_provider_user_agent():
+    user_agent = provider_user_agent()
+
+    from airflow import __version__ as airflow_version
+
+    user_agent_airflow = f"apache-airflow/{airflow_version}"
+    assert user_agent_airflow in user_agent
+
+    from airflow.providers_manager import ProvidersManager
+
+    manager = ProvidersManager()
+    provider_name = manager.hooks["yandexcloud"].package_name
+    provider = manager.providers[provider_name]
+    user_agent_provider = f"{provider_name}/{provider.version}"
+    assert user_agent_provider in user_agent
+
+    from airflow.configuration import conf
+
+    user_agent_prefix = conf.get("yandex", "sdk_user_agent_prefix", 
fallback="")
+    assert user_agent_prefix in user_agent
+
+
[email protected]("airflow.providers_manager.ProvidersManager.hooks")
+def test_provider_user_agent_hook_not_exists(mock_hooks):
+    mock_hooks.return_value = []
+
+    user_agent = provider_user_agent()
+
+    assert user_agent is None

Reply via email to