This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0dd075d1b AIP-21: yandexcloud: rename files, emit deprecation
warning (#39618)
e0dd075d1b is described below
commit e0dd075d1b4ef48bdae5a9a690a27518e4438104
Author: uzhastik <[email protected]>
AuthorDate: Wed May 15 21:19:05 2024 +0300
AIP-21: yandexcloud: rename files, emit deprecation warning (#39618)
* AIP-21: rename files, emit deprecation warning
* revert most of the changes
* fix static checks
* remove first line
* revert file
* fix static checks
* revert
* remove 4 ignores
* ignore missing tests for yandexcloud deprecated modules
---
.../hooks/{yandexcloud_dataproc.py => dataproc.py} | 0
.../providers/yandex/hooks/yandexcloud_dataproc.py | 25 +-
airflow/providers/yandex/hooks/yq.py | 15 +-
.../{yandexcloud_dataproc.py => dataproc.py} | 2 +-
.../yandex/operators/yandexcloud_dataproc.py | 521 +--------------------
airflow/providers/yandex/provider.yaml | 7 +-
generated/provider_dependencies.json | 2 +-
.../in_container/run_provider_yaml_files_check.py | 2 +
tests/always/test_project_structure.py | 2 +
tests/deprecations_ignore.yml | 4 -
...st_yandexcloud_dataproc.py => test_dataproc.py} | 2 +-
tests/providers/yandex/hooks/test_yandex.py | 14 +-
tests/providers/yandex/hooks/test_yq.py | 21 +-
...st_yandexcloud_dataproc.py => test_dataproc.py} | 10 +-
.../yandex/example_yandexcloud_dataproc.py | 2 +-
.../example_yandexcloud_dataproc_lightweight.py | 2 +-
16 files changed, 50 insertions(+), 581 deletions(-)
diff --git a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py
b/airflow/providers/yandex/hooks/dataproc.py
similarity index 100%
copy from airflow/providers/yandex/hooks/yandexcloud_dataproc.py
copy to airflow/providers/yandex/hooks/dataproc.py
diff --git a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py
b/airflow/providers/yandex/hooks/yandexcloud_dataproc.py
index 9b1862205e..6256769c92 100644
--- a/airflow/providers/yandex/hooks/yandexcloud_dataproc.py
+++ b/airflow/providers/yandex/hooks/yandexcloud_dataproc.py
@@ -14,22 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from __future__ import annotations
-
-from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
+"""This module is deprecated. Please use
:mod:`airflow.providers.yandex.hooks.dataproc` instead."""
+from __future__ import annotations
-class DataprocHook(YandexCloudBaseHook):
- """
- A base hook for Yandex.Cloud Data Proc.
+import warnings
- :param yandex_conn_id: The connection ID to use when fetching connection
info.
- """
+from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.providers.yandex.hooks.dataproc import * # noqa: F403
- def __init__(self, *args, **kwargs) -> None:
- super().__init__(*args, **kwargs)
- self.cluster_id = None
- self.client = self.sdk.wrappers.Dataproc(
- default_folder_id=self.default_folder_id,
- default_public_ssh_key=self.default_public_ssh_key,
- )
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.providers.yandex.hooks.dataproc` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+)
diff --git a/airflow/providers/yandex/hooks/yq.py
b/airflow/providers/yandex/hooks/yq.py
index 37f7550df6..c62595fe0a 100644
--- a/airflow/providers/yandex/hooks/yq.py
+++ b/airflow/providers/yandex/hooks/yq.py
@@ -19,9 +19,7 @@ from __future__ import annotations
from datetime import timedelta
from typing import Any
-import yandexcloud
-import yandexcloud._auth_fabric as auth_fabric
-from yandex.cloud.iam.v1.iam_token_service_pb2_grpc import IamTokenServiceStub
+import yandexcloud.auth as yc_auth
from yandex_query_client import YQHttpClient, YQHttpClientConfig
from airflow.providers.yandex.hooks.yandex import YandexCloudBaseHook
@@ -100,13 +98,4 @@ class YQHook(YandexCloudBaseHook):
if iam_token is not None:
return iam_token
- service_account_key = self.credentials.get("service_account_key")
- # if service_account_key is None metadata server will be used
- token_requester =
auth_fabric.get_auth_token_requester(service_account_key=service_account_key)
-
- if service_account_key is None:
- return token_requester.get_token()
-
- sdk = yandexcloud.SDK()
- client = sdk.client(IamTokenServiceStub)
- return client.Create(token_requester.get_token_request()).iam_token
+ return
yc_auth.get_auth_token(service_account_key=self.credentials.get("service_account_key"))
diff --git a/airflow/providers/yandex/operators/yandexcloud_dataproc.py
b/airflow/providers/yandex/operators/dataproc.py
similarity index 99%
copy from airflow/providers/yandex/operators/yandexcloud_dataproc.py
copy to airflow/providers/yandex/operators/dataproc.py
index 49bf136c3f..94bf096b66 100644
--- a/airflow/providers/yandex/operators/yandexcloud_dataproc.py
+++ b/airflow/providers/yandex/operators/dataproc.py
@@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Iterable, Sequence
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
-from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook
+from airflow.providers.yandex.hooks.dataproc import DataprocHook
if TYPE_CHECKING:
from airflow.utils.context import Context
diff --git a/airflow/providers/yandex/operators/yandexcloud_dataproc.py
b/airflow/providers/yandex/operators/yandexcloud_dataproc.py
index 49bf136c3f..1f7db5d512 100644
--- a/airflow/providers/yandex/operators/yandexcloud_dataproc.py
+++ b/airflow/providers/yandex/operators/yandexcloud_dataproc.py
@@ -14,522 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+"""This module is deprecated. Please use
:mod:`airflow.providers.yandex.operators.dataproc` instead."""
+
from __future__ import annotations
import warnings
-from dataclasses import dataclass
-from typing import TYPE_CHECKING, Iterable, Sequence
from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.models import BaseOperator
-from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook
-
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-
-@dataclass
-class InitializationAction:
- """Data for initialization action to be run at start of DataProc
cluster."""
-
- uri: str # Uri of the executable file
- args: Sequence[str] # Arguments to the initialization action
- timeout: int # Execution timeout
-
-
-class DataprocCreateClusterOperator(BaseOperator):
- """Creates Yandex.Cloud Data Proc cluster.
-
- :param folder_id: ID of the folder in which cluster should be created.
- :param cluster_name: Cluster name. Must be unique inside the folder.
- :param cluster_description: Cluster description.
- :param cluster_image_version: Cluster image version. Use default.
- :param ssh_public_keys: List of SSH public keys that will be deployed to
created compute instances.
- :param subnet_id: ID of the subnetwork. All Data Proc cluster nodes will
use one subnetwork.
- :param services: List of services that will be installed to the cluster.
Possible options:
- HDFS, YARN, MAPREDUCE, HIVE, TEZ, ZOOKEEPER, HBASE, SQOOP, FLUME,
SPARK, SPARK, ZEPPELIN, OOZIE
- :param s3_bucket: Yandex.Cloud S3 bucket to store cluster logs.
- Jobs will not work if the bucket is not specified.
- :param zone: Availability zone to create cluster in.
- Currently there are ru-central1-a, ru-central1-b and
ru-central1-c.
- :param service_account_id: Service account id for the cluster.
- Service account can be created inside the
folder.
- :param masternode_resource_preset: Resources preset (CPU+RAM configuration)
- for the primary node of the cluster.
- :param masternode_disk_size: Masternode storage size in GiB.
- :param masternode_disk_type: Masternode storage type. Possible options:
network-ssd, network-hdd.
- :param datanode_resource_preset: Resources preset (CPU+RAM configuration)
- for the data nodes of the cluster.
- :param datanode_disk_size: Datanodes storage size in GiB.
- :param datanode_disk_type: Datanodes storage type. Possible options:
network-ssd, network-hdd.
- :param computenode_resource_preset: Resources preset (CPU+RAM
configuration)
- for the compute nodes of the cluster.
- :param computenode_disk_size: Computenodes storage size in GiB.
- :param computenode_disk_type: Computenodes storage type. Possible options:
network-ssd, network-hdd.
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- :param computenode_max_count: Maximum number of nodes of compute
autoscaling subcluster.
- :param computenode_warmup_duration: The warmup time of the instance in
seconds. During this time,
- traffic is sent to the instance,
- but instance metrics are not
collected. In seconds.
- :param computenode_stabilization_duration: Minimum amount of time in
seconds for monitoring before
- Instance Groups can reduce the number of
instances in the group.
- During this time, the group size doesn't
decrease,
- even if the new metric values indicate that
it should. In seconds.
- :param computenode_preemptible: Preemptible instances are stopped at least
once every 24 hours,
- and can be stopped at any time if their resources are
needed by Compute.
- :param computenode_cpu_utilization_target: Defines an autoscaling rule
- based on the average CPU utilization of the
instance group.
- in percents. 10-100.
- By default is not set and default
autoscaling strategy is used.
- :param computenode_decommission_timeout: Timeout to gracefully
decommission nodes during downscaling.
- In seconds
- :param properties: Properties passed to main node software.
- Docs:
https://cloud.yandex.com/docs/data-proc/concepts/settings-list
- :param enable_ui_proxy: Enable UI Proxy feature for forwarding Hadoop
components web interfaces
- Docs:
https://cloud.yandex.com/docs/data-proc/concepts/ui-proxy
- :param host_group_ids: Dedicated host groups to place VMs of cluster on.
- Docs:
https://cloud.yandex.com/docs/compute/concepts/dedicated-host
- :param security_group_ids: User security groups.
- Docs:
https://cloud.yandex.com/docs/data-proc/concepts/network#security-groups
- :param log_group_id: Id of log group to write logs. By default logs will
be sent to default log group.
- To disable cloud log sending set cluster property
dataproc:disable_cloud_logging = true
- Docs: https://cloud.yandex.com/docs/data-proc/concepts/logs
- :param initialization_actions: Set of init-actions to run when cluster
starts.
- Docs:
https://cloud.yandex.com/docs/data-proc/concepts/init-action
- :param labels: Cluster labels as key:value pairs. No more than 64 per
resource.
- Docs:
https://cloud.yandex.com/docs/resource-manager/concepts/labels
- """
-
- def __init__(
- self,
- *,
- folder_id: str | None = None,
- cluster_name: str | None = None,
- cluster_description: str | None = "",
- cluster_image_version: str | None = None,
- ssh_public_keys: str | Iterable[str] | None = None,
- subnet_id: str | None = None,
- services: Iterable[str] = ("HDFS", "YARN", "MAPREDUCE", "HIVE",
"SPARK"),
- s3_bucket: str | None = None,
- zone: str = "ru-central1-b",
- service_account_id: str | None = None,
- masternode_resource_preset: str | None = None,
- masternode_disk_size: int | None = None,
- masternode_disk_type: str | None = None,
- datanode_resource_preset: str | None = None,
- datanode_disk_size: int | None = None,
- datanode_disk_type: str | None = None,
- datanode_count: int = 1,
- computenode_resource_preset: str | None = None,
- computenode_disk_size: int | None = None,
- computenode_disk_type: str | None = None,
- computenode_count: int = 0,
- computenode_max_hosts_count: int | None = None,
- computenode_measurement_duration: int | None = None,
- computenode_warmup_duration: int | None = None,
- computenode_stabilization_duration: int | None = None,
- computenode_preemptible: bool = False,
- computenode_cpu_utilization_target: int | None = None,
- computenode_decommission_timeout: int | None = None,
- connection_id: str | None = None,
- properties: dict[str, str] | None = None,
- enable_ui_proxy: bool = False,
- host_group_ids: Iterable[str] | None = None,
- security_group_ids: Iterable[str] | None = None,
- log_group_id: str | None = None,
- initialization_actions: Iterable[InitializationAction] | None = None,
- labels: dict[str, str] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(**kwargs)
- self.folder_id = folder_id
- self.yandex_conn_id = connection_id
- self.cluster_name = cluster_name
- self.cluster_description = cluster_description
- self.cluster_image_version = cluster_image_version
- self.ssh_public_keys = ssh_public_keys
- self.subnet_id = subnet_id
- self.services = services
- self.s3_bucket = s3_bucket
- self.zone = zone
- self.service_account_id = service_account_id
- self.masternode_resource_preset = masternode_resource_preset
- self.masternode_disk_size = masternode_disk_size
- self.masternode_disk_type = masternode_disk_type
- self.datanode_resource_preset = datanode_resource_preset
- self.datanode_disk_size = datanode_disk_size
- self.datanode_disk_type = datanode_disk_type
- self.datanode_count = datanode_count
- self.computenode_resource_preset = computenode_resource_preset
- self.computenode_disk_size = computenode_disk_size
- self.computenode_disk_type = computenode_disk_type
- self.computenode_count = computenode_count
- self.computenode_max_hosts_count = computenode_max_hosts_count
- self.computenode_measurement_duration =
computenode_measurement_duration
- self.computenode_warmup_duration = computenode_warmup_duration
- self.computenode_stabilization_duration =
computenode_stabilization_duration
- self.computenode_preemptible = computenode_preemptible
- self.computenode_cpu_utilization_target =
computenode_cpu_utilization_target
- self.computenode_decommission_timeout =
computenode_decommission_timeout
- self.properties = properties
- self.enable_ui_proxy = enable_ui_proxy
- self.host_group_ids = host_group_ids
- self.security_group_ids = security_group_ids
- self.log_group_id = log_group_id
- self.initialization_actions = initialization_actions
- self.labels = labels
-
- self.hook: DataprocHook | None = None
-
- def execute(self, context: Context) -> dict:
- self.hook = DataprocHook(
- yandex_conn_id=self.yandex_conn_id,
- )
- operation_result = self.hook.client.create_cluster(
- folder_id=self.folder_id,
- cluster_name=self.cluster_name,
- cluster_description=self.cluster_description,
- cluster_image_version=self.cluster_image_version,
- ssh_public_keys=self.ssh_public_keys,
- subnet_id=self.subnet_id,
- services=self.services,
- s3_bucket=self.s3_bucket,
- zone=self.zone,
- service_account_id=self.service_account_id or
self.hook.default_service_account_id,
- masternode_resource_preset=self.masternode_resource_preset,
- masternode_disk_size=self.masternode_disk_size,
- masternode_disk_type=self.masternode_disk_type,
- datanode_resource_preset=self.datanode_resource_preset,
- datanode_disk_size=self.datanode_disk_size,
- datanode_disk_type=self.datanode_disk_type,
- datanode_count=self.datanode_count,
- computenode_resource_preset=self.computenode_resource_preset,
- computenode_disk_size=self.computenode_disk_size,
- computenode_disk_type=self.computenode_disk_type,
- computenode_count=self.computenode_count,
- computenode_max_hosts_count=self.computenode_max_hosts_count,
-
computenode_measurement_duration=self.computenode_measurement_duration,
- computenode_warmup_duration=self.computenode_warmup_duration,
-
computenode_stabilization_duration=self.computenode_stabilization_duration,
- computenode_preemptible=self.computenode_preemptible,
-
computenode_cpu_utilization_target=self.computenode_cpu_utilization_target,
-
computenode_decommission_timeout=self.computenode_decommission_timeout,
- properties=self.properties,
- enable_ui_proxy=self.enable_ui_proxy,
- host_group_ids=self.host_group_ids,
- security_group_ids=self.security_group_ids,
- log_group_id=self.log_group_id,
- labels=self.labels,
- initialization_actions=self.initialization_actions
- and [
- self.hook.sdk.wrappers.InitializationAction(
- uri=init_action.uri,
- args=init_action.args,
- timeout=init_action.timeout,
- )
- for init_action in self.initialization_actions
- ],
- )
- cluster_id = operation_result.response.id
-
- context["task_instance"].xcom_push(key="cluster_id", value=cluster_id)
- # Deprecated
- context["task_instance"].xcom_push(key="yandexcloud_connection_id",
value=self.yandex_conn_id)
- return cluster_id
-
- @property
- def cluster_id(self):
- return self.output
-
-
-class DataprocBaseOperator(BaseOperator):
- """Base class for DataProc operators working with given cluster.
-
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- :param cluster_id: ID of the cluster to remove. (templated)
- """
-
- template_fields: Sequence[str] = ("cluster_id",)
-
- def __init__(self, *, yandex_conn_id: str | None = None, cluster_id: str |
None = None, **kwargs) -> None:
- super().__init__(**kwargs)
- self.cluster_id = cluster_id
- self.yandex_conn_id = yandex_conn_id
-
- def _setup(self, context: Context) -> DataprocHook:
- if self.cluster_id is None:
- self.cluster_id =
context["task_instance"].xcom_pull(key="cluster_id")
- if self.yandex_conn_id is None:
- xcom_yandex_conn_id =
context["task_instance"].xcom_pull(key="yandexcloud_connection_id")
- if xcom_yandex_conn_id:
- warnings.warn(
- "Implicit pass of `yandex_conn_id` is deprecated, please
pass it explicitly",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- self.yandex_conn_id = xcom_yandex_conn_id
-
- return DataprocHook(yandex_conn_id=self.yandex_conn_id)
-
- def execute(self, context: Context):
- raise NotImplementedError()
-
-
-class DataprocDeleteClusterOperator(DataprocBaseOperator):
- """Deletes Yandex.Cloud Data Proc cluster.
-
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- :param cluster_id: ID of the cluster to remove. (templated)
- """
-
- def __init__(self, *, connection_id: str | None = None, cluster_id: str |
None = None, **kwargs) -> None:
- super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id,
**kwargs)
-
- def execute(self, context: Context) -> None:
- hook = self._setup(context)
- hook.client.delete_cluster(self.cluster_id)
-
-
-class DataprocCreateHiveJobOperator(DataprocBaseOperator):
- """Runs Hive job in Data Proc cluster.
-
- :param query: Hive query.
- :param query_file_uri: URI of the script that contains Hive queries. Can
be placed in HDFS or S3.
- :param properties: A mapping of property names to values, used to
configure Hive.
- :param script_variables: Mapping of query variable names to values.
- :param continue_on_failure: Whether to continue executing queries if a
query fails.
- :param name: Name of the job. Used for labeling.
- :param cluster_id: ID of the cluster to run job in.
- Will try to take the ID from Dataproc Hook object if
it's specified. (templated)
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- """
-
- def __init__(
- self,
- *,
- query: str | None = None,
- query_file_uri: str | None = None,
- script_variables: dict[str, str] | None = None,
- continue_on_failure: bool = False,
- properties: dict[str, str] | None = None,
- name: str = "Hive job",
- cluster_id: str | None = None,
- connection_id: str | None = None,
- **kwargs,
- ) -> None:
- super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id,
**kwargs)
- self.query = query
- self.query_file_uri = query_file_uri
- self.script_variables = script_variables
- self.continue_on_failure = continue_on_failure
- self.properties = properties
- self.name = name
-
- def execute(self, context: Context) -> None:
- hook = self._setup(context)
- hook.client.create_hive_job(
- query=self.query,
- query_file_uri=self.query_file_uri,
- script_variables=self.script_variables,
- continue_on_failure=self.continue_on_failure,
- properties=self.properties,
- name=self.name,
- cluster_id=self.cluster_id,
- )
-
-
-class DataprocCreateMapReduceJobOperator(DataprocBaseOperator):
- """Runs Mapreduce job in Data Proc cluster.
-
- :param main_jar_file_uri: URI of jar file with job.
- Can be placed in HDFS or S3. Can be specified
instead of main_class.
- :param main_class: Name of the main class of the job. Can be specified
instead of main_jar_file_uri.
- :param file_uris: URIs of files used in the job. Can be placed in HDFS or
S3.
- :param archive_uris: URIs of archive files used in the job. Can be placed
in HDFS or S3.
- :param jar_file_uris: URIs of JAR files used in the job. Can be placed in
HDFS or S3.
- :param properties: Properties for the job.
- :param args: Arguments to be passed to the job.
- :param name: Name of the job. Used for labeling.
- :param cluster_id: ID of the cluster to run job in.
- Will try to take the ID from Dataproc Hook object if
it's specified. (templated)
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- """
-
- def __init__(
- self,
- *,
- main_class: str | None = None,
- main_jar_file_uri: str | None = None,
- jar_file_uris: Iterable[str] | None = None,
- archive_uris: Iterable[str] | None = None,
- file_uris: Iterable[str] | None = None,
- args: Iterable[str] | None = None,
- properties: dict[str, str] | None = None,
- name: str = "Mapreduce job",
- cluster_id: str | None = None,
- connection_id: str | None = None,
- **kwargs,
- ) -> None:
- super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id,
**kwargs)
- self.main_class = main_class
- self.main_jar_file_uri = main_jar_file_uri
- self.jar_file_uris = jar_file_uris
- self.archive_uris = archive_uris
- self.file_uris = file_uris
- self.args = args
- self.properties = properties
- self.name = name
-
- def execute(self, context: Context) -> None:
- hook = self._setup(context)
- hook.client.create_mapreduce_job(
- main_class=self.main_class,
- main_jar_file_uri=self.main_jar_file_uri,
- jar_file_uris=self.jar_file_uris,
- archive_uris=self.archive_uris,
- file_uris=self.file_uris,
- args=self.args,
- properties=self.properties,
- name=self.name,
- cluster_id=self.cluster_id,
- )
-
-
-class DataprocCreateSparkJobOperator(DataprocBaseOperator):
- """Runs Spark job in Data Proc cluster.
-
- :param main_jar_file_uri: URI of jar file with job. Can be placed in HDFS
or S3.
- :param main_class: Name of the main class of the job.
- :param file_uris: URIs of files used in the job. Can be placed in HDFS or
S3.
- :param archive_uris: URIs of archive files used in the job. Can be placed
in HDFS or S3.
- :param jar_file_uris: URIs of JAR files used in the job. Can be placed in
HDFS or S3.
- :param properties: Properties for the job.
- :param args: Arguments to be passed to the job.
- :param name: Name of the job. Used for labeling.
- :param cluster_id: ID of the cluster to run job in.
- Will try to take the ID from Dataproc Hook object if
it's specified. (templated)
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- :param packages: List of maven coordinates of jars to include on the
driver and executor classpaths.
- :param repositories: List of additional remote repositories to search for
the maven coordinates
- given with --packages.
- :param exclude_packages: List of groupId:artifactId, to exclude while
resolving the dependencies
- provided in --packages to avoid dependency conflicts.
- """
-
- def __init__(
- self,
- *,
- main_class: str | None = None,
- main_jar_file_uri: str | None = None,
- jar_file_uris: Iterable[str] | None = None,
- archive_uris: Iterable[str] | None = None,
- file_uris: Iterable[str] | None = None,
- args: Iterable[str] | None = None,
- properties: dict[str, str] | None = None,
- name: str = "Spark job",
- cluster_id: str | None = None,
- connection_id: str | None = None,
- packages: Iterable[str] | None = None,
- repositories: Iterable[str] | None = None,
- exclude_packages: Iterable[str] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id,
**kwargs)
- self.main_class = main_class
- self.main_jar_file_uri = main_jar_file_uri
- self.jar_file_uris = jar_file_uris
- self.archive_uris = archive_uris
- self.file_uris = file_uris
- self.args = args
- self.properties = properties
- self.name = name
- self.packages = packages
- self.repositories = repositories
- self.exclude_packages = exclude_packages
-
- def execute(self, context: Context) -> None:
- hook = self._setup(context)
- hook.client.create_spark_job(
- main_class=self.main_class,
- main_jar_file_uri=self.main_jar_file_uri,
- jar_file_uris=self.jar_file_uris,
- archive_uris=self.archive_uris,
- file_uris=self.file_uris,
- args=self.args,
- properties=self.properties,
- packages=self.packages,
- repositories=self.repositories,
- exclude_packages=self.exclude_packages,
- name=self.name,
- cluster_id=self.cluster_id,
- )
-
-
-class DataprocCreatePysparkJobOperator(DataprocBaseOperator):
- """Runs Pyspark job in Data Proc cluster.
-
- :param main_python_file_uri: URI of python file with job. Can be placed in
HDFS or S3.
- :param python_file_uris: URIs of python files used in the job. Can be
placed in HDFS or S3.
- :param file_uris: URIs of files used in the job. Can be placed in HDFS or
S3.
- :param archive_uris: URIs of archive files used in the job. Can be placed
in HDFS or S3.
- :param jar_file_uris: URIs of JAR files used in the job. Can be placed in
HDFS or S3.
- :param properties: Properties for the job.
- :param args: Arguments to be passed to the job.
- :param name: Name of the job. Used for labeling.
- :param cluster_id: ID of the cluster to run job in.
- Will try to take the ID from Dataproc Hook object if
it's specified. (templated)
- :param connection_id: ID of the Yandex.Cloud Airflow connection.
- :param packages: List of maven coordinates of jars to include on the
driver and executor classpaths.
- :param repositories: List of additional remote repositories to search for
the maven coordinates
- given with --packages.
- :param exclude_packages: List of groupId:artifactId, to exclude while
resolving the dependencies
- provided in --packages to avoid dependency conflicts.
- """
-
- def __init__(
- self,
- *,
- main_python_file_uri: str | None = None,
- python_file_uris: Iterable[str] | None = None,
- jar_file_uris: Iterable[str] | None = None,
- archive_uris: Iterable[str] | None = None,
- file_uris: Iterable[str] | None = None,
- args: Iterable[str] | None = None,
- properties: dict[str, str] | None = None,
- name: str = "Pyspark job",
- cluster_id: str | None = None,
- connection_id: str | None = None,
- packages: Iterable[str] | None = None,
- repositories: Iterable[str] | None = None,
- exclude_packages: Iterable[str] | None = None,
- **kwargs,
- ) -> None:
- super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id,
**kwargs)
- self.main_python_file_uri = main_python_file_uri
- self.python_file_uris = python_file_uris
- self.jar_file_uris = jar_file_uris
- self.archive_uris = archive_uris
- self.file_uris = file_uris
- self.args = args
- self.properties = properties
- self.name = name
- self.packages = packages
- self.repositories = repositories
- self.exclude_packages = exclude_packages
+from airflow.providers.yandex.operators.dataproc import * # noqa: F403
- def execute(self, context: Context) -> None:
- hook = self._setup(context)
- hook.client.create_pyspark_job(
- main_python_file_uri=self.main_python_file_uri,
- python_file_uris=self.python_file_uris,
- jar_file_uris=self.jar_file_uris,
- archive_uris=self.archive_uris,
- file_uris=self.file_uris,
- args=self.args,
- properties=self.properties,
- packages=self.packages,
- repositories=self.repositories,
- exclude_packages=self.exclude_packages,
- name=self.name,
- cluster_id=self.cluster_id,
- )
+warnings.warn(
+ "This module is deprecated. Please use
`airflow.providers.yandex.operators.dataproc` instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+)
diff --git a/airflow/providers/yandex/provider.yaml
b/airflow/providers/yandex/provider.yaml
index efbf4bba93..764a08586f 100644
--- a/airflow/providers/yandex/provider.yaml
+++ b/airflow/providers/yandex/provider.yaml
@@ -50,7 +50,7 @@ versions:
dependencies:
- apache-airflow>=2.7.0
- - yandexcloud>=0.228.0
+ - yandexcloud>=0.278.0
- yandex-query-client>=0.1.4
integrations:
@@ -76,8 +76,7 @@ integrations:
operators:
- integration-name: Yandex.Cloud Dataproc
python-modules:
- - airflow.providers.yandex.operators.yandexcloud_dataproc
-
+ - airflow.providers.yandex.operators.dataproc
- integration-name: Yandex.Cloud YQ
python-modules:
- airflow.providers.yandex.operators.yq
@@ -88,7 +87,7 @@ hooks:
- airflow.providers.yandex.hooks.yandex
- integration-name: Yandex.Cloud Dataproc
python-modules:
- - airflow.providers.yandex.hooks.yandexcloud_dataproc
+ - airflow.providers.yandex.hooks.dataproc
- integration-name: Yandex.Cloud YQ
python-modules:
- airflow.providers.yandex.hooks.yq
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index d2c0896ba8..f10e93060e 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -1199,7 +1199,7 @@
"deps": [
"apache-airflow>=2.7.0",
"yandex-query-client>=0.1.4",
- "yandexcloud>=0.228.0"
+ "yandexcloud>=0.278.0"
],
"devel-deps": [],
"cross-providers-deps": [],
diff --git a/scripts/in_container/run_provider_yaml_files_check.py
b/scripts/in_container/run_provider_yaml_files_check.py
index 4f74846147..d3cf8b1e2e 100755
--- a/scripts/in_container/run_provider_yaml_files_check.py
+++ b/scripts/in_container/run_provider_yaml_files_check.py
@@ -51,6 +51,8 @@ DEPRECATED_MODULES = [
"airflow.providers.cncf.kubernetes.triggers.kubernetes_pod",
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod",
"airflow.providers.tabular.hooks.tabular",
+ "airflow.providers.yandex.hooks.yandexcloud_dataproc",
+ "airflow.providers.yandex.operators.yandexcloud_dataproc",
]
KNOWN_DEPRECATED_CLASSES = [
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 428570ee58..ba4a467351 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -159,6 +159,8 @@ class TestProjectStructure:
"tests/providers/redis/sensors/test_redis_key.py",
"tests/providers/slack/notifications/test_slack_notifier.py",
"tests/providers/snowflake/triggers/test_snowflake_trigger.py",
+ "tests/providers/yandex/hooks/test_yandexcloud_dataproc.py",
+ "tests/providers/yandex/operators/test_yandexcloud_dataproc.py",
]
# TODO: Should we extend this test to cover other directories?
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index e63df14631..d6400074e6 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -691,8 +691,4 @@
-
tests/providers/weaviate/operators/test_weaviate.py::TestWeaviateIngestOperator::test_execute_with_input_json
-
tests/providers/yandex/hooks/test_yandex.py::TestYandexHook::test_provider_user_agent
-
tests/providers/yandex/hooks/test_yandex.py::TestYandexHook::test_backcompat_prefix_works
--
tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_hive_job_operator
--
tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_mapreduce_job_operator
--
tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_spark_job_operator
--
tests/providers/yandex/operators/test_yandexcloud_dataproc.py::TestDataprocClusterCreateOperator::test_create_pyspark_job_operator
-
tests/providers/yandex/secrets/test_lockbox.py::TestLockboxSecretBackend::test_yandex_lockbox_secret_backend_get_connection_from_json
diff --git a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
b/tests/providers/yandex/hooks/test_dataproc.py
similarity index 99%
rename from tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
rename to tests/providers/yandex/hooks/test_dataproc.py
index 5629a3678e..0665b17b47 100644
--- a/tests/providers/yandex/hooks/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/hooks/test_dataproc.py
@@ -20,7 +20,7 @@ import json
from unittest import mock
from airflow.models import Connection
-from airflow.providers.yandex.hooks.yandexcloud_dataproc import DataprocHook
+from airflow.providers.yandex.hooks.dataproc import DataprocHook
# Airflow connection with type "yandexcloud" must be created
CONNECTION_ID = "yandexcloud_default"
diff --git a/tests/providers/yandex/hooks/test_yandex.py
b/tests/providers/yandex/hooks/test_yandex.py
index ea885c6b3f..ae6dac568b 100644
--- a/tests/providers/yandex/hooks/test_yandex.py
+++ b/tests/providers/yandex/hooks/test_yandex.py
@@ -40,7 +40,7 @@ class TestYandexHook:
mock_get_connection["extra_dejson"] = "sds"
mock_get_connection.extra_dejson = '{"extras": "extra"}'
mock_get_connection.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson
)
mock_get_credentials.return_value = {"token": 122323}
@@ -54,7 +54,7 @@ class TestYandexHook:
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
@mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
def test_provider_user_agent(self, mock_get_credentials,
mock_get_connection):
- mock_get_connection.return_value =
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+ mock_get_connection.return_value =
mock.Mock(yandex_conn_id="yandexcloud_default", extra_dejson="{}")
mock_get_credentials.return_value = {"token": 122323}
sdk_prefix = "MyAirflow"
@@ -65,7 +65,7 @@ class TestYandexHook:
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
@mock.patch("airflow.providers.yandex.utils.credentials.get_credentials")
def test_sdk_user_agent(self, mock_get_credentials, mock_get_connection):
- mock_get_connection.return_value =
mock.Mock(connection_id="yandexcloud_default", extra_dejson="{}")
+ mock_get_connection.return_value =
mock.Mock(yandex_conn_id="yandexcloud_default", extra_dejson="{}")
mock_get_credentials.return_value = {"token": 122323}
sdk_prefix = "MyAirflow"
@@ -97,7 +97,7 @@ class TestYandexHook:
extra_dejson = {"endpoint": "my_endpoint", "something_else":
"some_value"}
mock_get_connection.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson
)
mock_get_credentials.return_value = {"token": 122323}
@@ -117,7 +117,7 @@ class TestYandexHook:
extra_dejson = {"something_else": "some_value"}
mock_get_connection.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson
)
mock_get_credentials.return_value = {"token": 122323}
@@ -140,7 +140,7 @@ class TestYandexHook:
mock_get_connection["extra_dejson"] = "sds"
mock_get_connection.extra_dejson = '{"extras": "extra"}'
mock_get_connection.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson
)
hook = YandexCloudBaseHook(
@@ -163,7 +163,7 @@ class TestYandexHook:
get_connection_mock["extra_dejson"] = "sds"
get_connection_mock.extra_dejson = '{"extras": "extra"}'
get_connection_mock.return_value = mock.Mock(
- connection_id="yandexcloud_default", extra_dejson=extra_dejson
+ yandex_conn_id="yandexcloud_default", extra_dejson=extra_dejson
)
hook = YandexCloudBaseHook()
diff --git a/tests/providers/yandex/hooks/test_yq.py
b/tests/providers/yandex/hooks/test_yq.py
index 2864642c0e..71723d97b8 100644
--- a/tests/providers/yandex/hooks/test_yq.py
+++ b/tests/providers/yandex/hooks/test_yq.py
@@ -31,20 +31,11 @@ IAM_TOKEN = "my_iam_token"
SERVICE_ACCOUNT_AUTH_KEY_JSON = """{"id":"my_id",
"service_account_id":"my_sa1", "private_key":"my_pk"}"""
-class FakeTokenRequester:
- def get_token(self) -> str:
- return IAM_TOKEN
-
- def get_token_request(self) -> str:
- return "my_dummy_request"
-
-
class TestYandexCloudYqHook:
def _init_hook(self):
with mock.patch("airflow.hooks.base.BaseHook.get_connection") as
mock_get_connection:
mock_get_connection.return_value = self.connection
- with
mock.patch("airflow.providers.yandex.hooks.yq.yandexcloud.SDK.client"):
- self.hook = YQHook(default_folder_id="my_folder_id")
+ self.hook = YQHook(default_folder_id="my_folder_id")
def setup_method(self):
self.connection = Connection(extra={"service_account_json":
SERVICE_ACCOUNT_AUTH_KEY_JSON})
@@ -74,8 +65,8 @@ class TestYandexCloudYqHook:
m.assert_called_once_with("query1")
@responses.activate()
- @mock.patch("yandexcloud._auth_fabric.get_auth_token_requester",
return_value=FakeTokenRequester())
- def test_metadata_token_usage(self, mock_get_auth_token_requester):
+ @mock.patch("yandexcloud.auth.get_auth_token", return_value=IAM_TOKEN)
+ def test_metadata_token_usage(self, mock_get_auth_token):
responses.post(
"https://api.yandex-query.cloud.yandex.net/api/fq/v1/queries",
match=[
@@ -94,8 +85,8 @@ class TestYandexCloudYqHook:
assert query_id == "query1"
@mock.patch("yandexcloud._auth_fabric.__validate_service_account_key")
- @mock.patch("yandexcloud._auth_fabric.get_auth_token_requester",
return_value=FakeTokenRequester())
- def test_select_results(self, mock_get_auth_token_requester,
mock_validate):
+ @mock.patch("yandexcloud.auth.get_auth_token", return_value=IAM_TOKEN)
+ def test_select_results(self, mock_get_auth_token, mock_validate):
with mock.patch.multiple(
"yandex_query_client.YQHttpClient",
create_query=mock.DEFAULT,
@@ -107,7 +98,7 @@ class TestYandexCloudYqHook:
) as mocks:
self._init_hook()
mock_validate.assert_called()
- mock_get_auth_token_requester.assert_called_once_with(
+ mock_get_auth_token.assert_called_once_with(
service_account_key=json.loads(SERVICE_ACCOUNT_AUTH_KEY_JSON)
)
diff --git a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
b/tests/providers/yandex/operators/test_dataproc.py
similarity index 98%
rename from tests/providers/yandex/operators/test_yandexcloud_dataproc.py
rename to tests/providers/yandex/operators/test_dataproc.py
index 0a055dd377..9711ba55ee 100644
--- a/tests/providers/yandex/operators/test_yandexcloud_dataproc.py
+++ b/tests/providers/yandex/operators/test_dataproc.py
@@ -20,7 +20,7 @@ import datetime
from unittest.mock import MagicMock, call, patch
from airflow.models.dag import DAG
-from airflow.providers.yandex.operators.yandexcloud_dataproc import (
+from airflow.providers.yandex.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocCreateHiveJobOperator,
DataprocCreateMapReduceJobOperator,
@@ -161,6 +161,7 @@ class TestDataprocClusterCreateOperator:
operator = DataprocCreateHiveJobOperator(
task_id="create_hive_job",
query="SELECT 1;",
+ connection_id=CONNECTION_ID,
)
context = {"task_instance": MagicMock()}
context["task_instance"].xcom_pull.return_value = "my_cluster_id"
@@ -169,7 +170,6 @@ class TestDataprocClusterCreateOperator:
context["task_instance"].xcom_pull.assert_has_calls(
[
call(key="cluster_id"),
- call(key="yandexcloud_connection_id"),
]
)
@@ -189,6 +189,7 @@ class TestDataprocClusterCreateOperator:
def test_create_mapreduce_job_operator(self, mock_create_mapreduce_job,
*_):
operator = DataprocCreateMapReduceJobOperator(
task_id="run_mapreduce_job",
+ connection_id=CONNECTION_ID,
main_class="org.apache.hadoop.streaming.HadoopStreaming",
file_uris=[
"s3a://some-in-bucket/jobs/sources/mapreduce-001/mapper.py",
@@ -219,7 +220,6 @@ class TestDataprocClusterCreateOperator:
context["task_instance"].xcom_pull.assert_has_calls(
[
call(key="cluster_id"),
- call(key="yandexcloud_connection_id"),
]
)
@@ -259,6 +259,7 @@ class TestDataprocClusterCreateOperator:
def test_create_spark_job_operator(self, mock_create_spark_job, *_):
operator = DataprocCreateSparkJobOperator(
task_id="create_spark_job",
+ connection_id=CONNECTION_ID,
main_jar_file_uri="s3a://data-proc-public/jobs/sources/java/dataproc-examples-1.0.jar",
main_class="ru.yandex.cloud.dataproc.examples.PopulationSparkJob",
file_uris=[
@@ -288,7 +289,6 @@ class TestDataprocClusterCreateOperator:
context["task_instance"].xcom_pull.assert_has_calls(
[
call(key="cluster_id"),
- call(key="yandexcloud_connection_id"),
]
)
@@ -321,6 +321,7 @@ class TestDataprocClusterCreateOperator:
def test_create_pyspark_job_operator(self, mock_create_pyspark_job, *_):
operator = DataprocCreatePysparkJobOperator(
task_id="create_pyspark_job",
+ connection_id=CONNECTION_ID,
main_python_file_uri="s3a://some-in-bucket/jobs/sources/pyspark-001/main.py",
python_file_uris=[
"s3a://some-in-bucket/jobs/sources/pyspark-001/geonames.py",
@@ -351,7 +352,6 @@ class TestDataprocClusterCreateOperator:
context["task_instance"].xcom_pull.assert_has_calls(
[
call(key="cluster_id"),
- call(key="yandexcloud_connection_id"),
]
)
diff --git a/tests/system/providers/yandex/example_yandexcloud_dataproc.py
b/tests/system/providers/yandex/example_yandexcloud_dataproc.py
index cfae4e94e0..7ff4aa541d 100644
--- a/tests/system/providers/yandex/example_yandexcloud_dataproc.py
+++ b/tests/system/providers/yandex/example_yandexcloud_dataproc.py
@@ -20,7 +20,7 @@ import uuid
from datetime import datetime
from airflow import DAG
-from airflow.providers.yandex.operators.yandexcloud_dataproc import (
+from airflow.providers.yandex.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocCreateHiveJobOperator,
DataprocCreateMapReduceJobOperator,
diff --git
a/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py
b/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py
index fa5a3c758b..475bc789ec 100644
--- a/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py
+++ b/tests/system/providers/yandex/example_yandexcloud_dataproc_lightweight.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from datetime import datetime
from airflow import DAG
-from airflow.providers.yandex.operators.yandexcloud_dataproc import (
+from airflow.providers.yandex.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocCreateSparkJobOperator,
DataprocDeleteClusterOperator,