This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dfb2403ec4 Add Dataplex Data Quality operators. (#32256)
dfb2403ec4 is described below
commit dfb2403ec4b6d147ac31125631677cee9e12347e
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Aug 14 00:21:14 2023 +0200
Add Dataplex Data Quality operators. (#32256)
---------
Co-authored-by: Beata Kossakowska <[email protected]>
---
airflow/providers/google/cloud/hooks/dataplex.py | 500 +++++++++++-
.../providers/google/cloud/operators/dataplex.py | 859 ++++++++++++++++++++-
airflow/providers/google/cloud/sensors/dataplex.py | 126 ++-
.../operators/cloud/dataplex.rst | 165 +++-
docs/spelling_wordlist.txt | 3 +
.../providers/google/cloud/hooks/test_dataplex.py | 155 +++-
.../google/cloud/operators/test_dataplex.py | 274 +++++++
.../google/cloud/sensors/test_dataplex.py | 47 +-
.../google/cloud/dataplex/example_dataplex_dq.py | 343 ++++++++
9 files changed, 2454 insertions(+), 18 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py
b/airflow/providers/google/cloud/hooks/dataplex.py
index 405870fd31..5fbfb0bd6e 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -17,20 +17,39 @@
"""This module contains Google Dataplex hook."""
from __future__ import annotations
+import time
from typing import Any, Sequence
from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.operation import Operation
from google.api_core.retry import Retry
-from google.cloud.dataplex_v1 import DataplexServiceClient
-from google.cloud.dataplex_v1.types import Lake, Task
+from google.cloud.dataplex_v1 import DataplexServiceClient,
DataScanServiceClient
+from google.cloud.dataplex_v1.types import (
+ Asset,
+ DataScan,
+ DataScanJob,
+ Lake,
+ Task,
+ Zone,
+)
+from google.protobuf.field_mask_pb2 import FieldMask
from googleapiclient.discovery import Resource
from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+PATH_DATA_SCAN =
"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
+
+
+class AirflowDataQualityScanException(AirflowException):
+ """Raised when data quality scan rules fail."""
+
+
+class AirflowDataQualityScanResultTimeoutException(AirflowException):
+ """Raised when no result found after specified amount of seconds."""
+
class DataplexHook(GoogleBaseHook):
"""
@@ -55,6 +74,7 @@ class DataplexHook(GoogleBaseHook):
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
+ location: str | None = None,
**kwargs,
) -> None:
if kwargs.get("delegate_to") is not None:
@@ -67,6 +87,7 @@ class DataplexHook(GoogleBaseHook):
impersonation_chain=impersonation_chain,
)
self.api_version = api_version
+ self.location = location
def get_dataplex_client(self) -> DataplexServiceClient:
"""Returns DataplexServiceClient."""
@@ -76,6 +97,14 @@ class DataplexHook(GoogleBaseHook):
credentials=self.get_credentials(), client_info=CLIENT_INFO,
client_options=client_options
)
+ def get_dataplex_data_scan_client(self) -> DataScanServiceClient:
+ """Returns DataScanServiceClient."""
+ client_options =
ClientOptions(api_endpoint="dataplex.googleapis.com:443")
+
+ return DataScanServiceClient(
+ credentials=self.get_credentials(), client_info=CLIENT_INFO,
client_options=client_options
+ )
+
def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Waits for long-lasting operation to complete."""
try:
@@ -361,3 +390,470 @@ class DataplexHook(GoogleBaseHook):
metadata=metadata,
)
return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_zone(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone_id: str,
+ body: dict[str, Any] | Zone,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Creates a zone resource within a lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be
retrieved.
+ :param body: Required. The Request body contains an instance of Zone.
+ :param zone_id: Required. Zone identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_client()
+
+ name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}"
+ result = client.create_zone(
+ request={
+ "parent": name,
+ "zone": body,
+ "zone_id": zone_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_zone(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Deletes a zone resource. All assets within a zone must be deleted
before the zone can be deleted.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be
retrieved.
+ :param zone_id: Required. Zone identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_client()
+
+ name =
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}"
+ operation = client.delete_zone(
+ request={"name": name},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return operation
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_asset(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone_id: str,
+ asset_id: str,
+ body: dict[str, Any] | Asset,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Creates an asset resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be
retrieved.
+ :param zone_id: Required. Zone identifier.
+ :param asset_id: Required. Asset identifier.
+ :param body: Required. The Request body contains an instance of Asset.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_client()
+
+ name =
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}"
+ result = client.create_asset(
+ request={
+ "parent": name,
+ "asset": body,
+ "asset_id": asset_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_asset(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ asset_id: str,
+ zone_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Deletes an asset resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be
retrieved.
+ :param zone_id: Required. Zone identifier.
+ :param asset_id: Required. Asset identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_client()
+
+ name =
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}/assets/{asset_id}"
+ result = client.delete_asset(
+ request={"name": name},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_data_scan(
+ self,
+ project_id: str,
+ region: str,
+ body: dict[str, Any] | DataScan,
+ data_scan_id: str | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param body: Required. The Request body contains an instance of
DataScan.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ parent = f"projects/{project_id}/locations/{region}"
+ result = client.create_data_scan(
+ request={
+ "parent": parent,
+ "data_scan": body,
+ "data_scan_id": data_scan_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def run_data_scan(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ name = PATH_DATA_SCAN.format(project_id=project_id, region=region,
data_scan_id=data_scan_id)
+ result = client.run_data_scan(
+ request={
+ "name": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_data_scan_job(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str | None = None,
+ job_id: str | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Gets a DataScan Job resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param job_id: Required. The resource name of the DataScanJob:
+
projects/{project_id}/locations/{region}/dataScans/{data_scan_id}/jobs/{data_scan_job_id}
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ name =
f"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}/jobs/{job_id}"
+ result = client.get_data_scan_job(
+ request={"name": name, "view": "FULL"},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ def wait_for_data_scan_job(
+ self,
+ data_scan_id: str,
+ job_id: str | None = None,
+ project_id: str | None = None,
+ region: str | None = None,
+ wait_time: int = 10,
+ result_timeout: float | None = None,
+ ) -> Any:
+ """
+ Wait for Dataplex data scan job.
+
+ :param job_id: Required. The job_id to wait for.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param project_id: Optional. Google Cloud project ID.
+ :param wait_time: Number of seconds between checks.
+ :param result_timeout: Value in seconds for which operator will wait
for the Data Quality scan result.
+ Throws exception if there is no result found after specified
amount of seconds.
+ """
+ start = time.monotonic()
+ state = None
+ while state not in (
+ DataScanJob.State.CANCELLED,
+ DataScanJob.State.FAILED,
+ DataScanJob.State.SUCCEEDED,
+ ):
+ if result_timeout and start + result_timeout < time.monotonic():
+ raise AirflowDataQualityScanResultTimeoutException(
+ f"Timeout: Data Quality scan {job_id} is not ready after
{result_timeout}s"
+ )
+ time.sleep(wait_time)
+ try:
+ job = self.get_data_scan_job(
+ job_id=job_id,
+ data_scan_id=data_scan_id,
+ project_id=project_id,
+ region=region,
+ )
+ state = job.state
+ except Exception as err:
+ self.log.info("Retrying. Dataplex API returned error when
waiting for job: %s", err)
+ return job
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_data_scan(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Gets a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ name = PATH_DATA_SCAN.format(project_id=project_id, region=region,
data_scan_id=data_scan_id)
+ result = client.get_data_scan(
+ request={"name": name, "view": "FULL"},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_data_scan(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ body: dict[str, Any] | DataScan,
+ update_mask: dict | FieldMask | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Updates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param body: Required. The Request body contains an instance of
DataScan.
+ :param update_mask: Required. Mask of fields to update.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ full_scan_name =
f"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
+
+ if body:
+ if isinstance(body, DataScan):
+ body.name = full_scan_name
+ elif isinstance(body, dict):
+ body["name"] = full_scan_name
+ else:
+ raise AirflowException("Unable to set scan_name.")
+
+ if not update_mask:
+ update_mask = FieldMask(
+ paths=["data_quality_spec", "labels", "description",
"display_name", "execution_spec"]
+ )
+
+ result = client.update_data_scan(
+ request={
+ "data_scan": body,
+ "update_mask": update_mask,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_data_scan(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ name = PATH_DATA_SCAN.format(project_id=project_id, region=region,
data_scan_id=data_scan_id)
+ result = client.delete_data_scan(
+ request={
+ "name": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_data_scan_jobs(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Lists DataScanJobs under the given DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
lake belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ client = self.get_dataplex_data_scan_client()
+
+ name = PATH_DATA_SCAN.format(project_id=project_id, region=region,
data_scan_id=data_scan_id)
+ result = client.list_data_scan_jobs(
+ request={
+ "parent": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
diff --git a/airflow/providers/google/cloud/operators/dataplex.py
b/airflow/providers/google/cloud/operators/dataplex.py
index 7b50523e54..2b27cb70db 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -21,15 +21,19 @@ from __future__ import annotations
from time import sleep
from typing import TYPE_CHECKING, Any, Sequence
+from airflow import AirflowException
+
if TYPE_CHECKING:
from airflow.utils.context import Context
+from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry, exponential_sleep_generator
-from google.cloud.dataplex_v1.types import Lake, Task
+from google.cloud.dataplex_v1.types import Asset, DataScan, DataScanJob, Lake,
Task, Zone
+from google.protobuf.field_mask_pb2 import FieldMask
from googleapiclient.errors import HttpError
-from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.hooks.dataplex import
AirflowDataQualityScanException, DataplexHook
from airflow.providers.google.cloud.links.dataplex import (
DataplexLakeLink,
DataplexTaskLink,
@@ -440,8 +444,7 @@ class DataplexCreateLakeOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param asynchronous: Flag informing should the Dataplex lake be created
asynchronously.
- This is useful for long running creating lakes and
- waiting on them asynchronously using the DataplexLakeSensor
+ This is useful for long-running creating lakes.
"""
template_fields = (
@@ -590,7 +593,6 @@ class DataplexDeleteLakeOperator(GoogleCloudBaseOperator):
self.impersonation_chain = impersonation_chain
def execute(self, context: Context) -> None:
-
hook = DataplexHook(
gcp_conn_id=self.gcp_conn_id,
api_version=self.api_version,
@@ -610,3 +612,850 @@ class DataplexDeleteLakeOperator(GoogleCloudBaseOperator):
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateOrUpdateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param body: Required. The Request body contains an instance of DataScan.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param update_mask: Mask of fields to update.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "body",
"impersonation_chain")
+ template_fields_renderers = {"body": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ body: dict[str, Any] | DataScan,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ update_mask: dict | FieldMask | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.body = body
+ self.update_mask = update_mask
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex Data Quality scan %s",
self.data_scan_id)
+ try:
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ body=self.body,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex Data Quality scan %s created
successfully!", self.data_scan_id)
+ except AlreadyExists:
+ self.log.info("Dataplex Data Quality scan already exists: %s",
{self.data_scan_id})
+
+ operation = hook.update_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ body=self.body,
+ update_mask=self.update_mask,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex Data Quality scan %s updated
successfully!", self.data_scan_id)
+ except GoogleAPICallError as e:
+ raise AirflowException(f"Error creating Data Quality scan
{self.data_scan_id}", e)
+
+ return self.data_scan_id
+
+
+class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Gets a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Retrieving the details of Dataplex Data Quality scan
%s", self.data_scan_id)
+ data_quality_scan = hook.get_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ return DataScan.to_dict(data_quality_scan)
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex Data Quality Scan: %s",
self.data_scan_id)
+
+ operation = hook.delete_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex Data Quality scan %s deleted successfully!",
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag informing that the Dataplex job should be run
asynchronously.
+ This is useful for submitting long-running jobs and
+ waiting on them asynchronously using the
DataplexDataQualityJobStatusSensor
+ :param fail_on_dq_failure: If set to true and not all Data Quality scan
rules have been passed,
+ an exception is thrown. If set to false and not all Data Quality scan
rules have been passed,
+ execution will finish with success.
+ :param result_timeout: Value in seconds for which operator will wait for
the Data Quality scan result
+ when the flag `asynchronous = False`.
+ Throws exception if there is no result found after specified amount of
seconds.
+
+ :return: Dataplex Data Quality scan job id.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ asynchronous: bool = False,
+ fail_on_dq_failure: bool = False,
+ result_timeout: float = 60.0 * 10,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+ self.fail_on_dq_failure = fail_on_dq_failure
+ self.result_timeout = result_timeout
+
+ def execute(self, context: Context) -> str:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ result = hook.run_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_id = result.job.name.split("/")[-1]
+ if not self.asynchronous:
+ job = hook.wait_for_data_scan_job(
+ job_id=job_id,
+ data_scan_id=self.data_scan_id,
+ project_id=self.project_id,
+ region=self.region,
+ result_timeout=self.result_timeout,
+ )
+
+ if job.state == DataScanJob.State.FAILED:
+ raise AirflowException(f"Data Quality job failed: {job_id}")
+ if job.state == DataScanJob.State.SUCCEEDED:
+ if not job.data_quality_result.passed:
+ if self.fail_on_dq_failure:
+ raise AirflowDataQualityScanException(
+ f"Data Quality job {job_id} execution failed due
to failure of its scanning "
+ f"rules: {self.data_scan_id}"
+ )
+ else:
+ self.log.info("Data Quality job executed successfully.")
+ else:
+ self.log.info("Data Quality job execution returned status:
%s", job.status)
+
+ return job_id
+
+
+class DataplexGetDataQualityScanResultOperator(GoogleCloudBaseOperator):
+ """
+ Gets a Data Scan Job resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param job_id: Optional. Data Quality scan job identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param fail_on_dq_failure: If set to true and not all Data Quality scan
rules have been passed,
+ an exception is thrown. If set to false and not all Data Quality scan
rules have been passed,
+ execution will finish with success.
+ :param wait_for_result: Flag indicating whether to wait for the result of
a job execution
+ or to return the job in its current state.
+ :param result_timeout: Value in seconds for which operator will wait for
the Data Quality scan result
+ when the flag `wait_for_result = True`.
+ Throws exception if there is no result found after specified amount of
seconds.
+
+ :return: Dict representing DataScanJob.
+ When the job completes with a successful status, information about the
Data Quality result
+ is available.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ job_id: str | None = None,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ fail_on_dq_failure: bool = False,
+ wait_for_results: bool = True,
+ result_timeout: float = 60.0 * 10,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.job_id = job_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.fail_on_dq_failure = fail_on_dq_failure
+ self.wait_for_results = wait_for_results
+ self.result_timeout = result_timeout
+
+ def execute(self, context: Context) -> dict:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ # fetch the last job
+ if not self.job_id:
+ jobs = hook.list_data_scan_jobs(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_ids = [DataScanJob.to_dict(job) for job in jobs]
+ if not job_ids:
+ raise AirflowException("There are no jobs, you should create
one before.")
+ job_id = job_ids[0]["name"]
+ self.job_id = job_id.split("/")[-1]
+
+ if self.wait_for_results:
+ job = hook.wait_for_data_scan_job(
+ job_id=self.job_id,
+ data_scan_id=self.data_scan_id,
+ project_id=self.project_id,
+ region=self.region,
+ result_timeout=self.result_timeout,
+ )
+ else:
+ job = hook.get_data_scan_job(
+ project_id=self.project_id,
+ region=self.region,
+ job_id=self.job_id,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ if job.state == DataScanJob.State.SUCCEEDED:
+ if not job.data_quality_result.passed:
+ if self.fail_on_dq_failure:
+ raise AirflowDataQualityScanException(
+ f"Data Quality job {self.job_id} execution failed due
to failure of its scanning "
+ f"rules: {self.data_scan_id}"
+ )
+ else:
+ self.log.info("Data Quality job executed successfully")
+ else:
+ self.log.info("Data Quality job execution returned status: %s",
job.state)
+
+ result = DataScanJob.to_dict(job)
+ result["state"] = DataScanJob.State(result["state"]).name
+
+ return result
+
+
+class DataplexCreateZoneOperator(GoogleCloudBaseOperator):
+ """
+ Creates a Zone resource within a Lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param body: Required. The Request body contains an instance of Zone.
+ :param zone_id: Required. Task identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Zone
+ """
+
+ template_fields = (
+ "project_id",
+ "zone_id",
+ "body",
+ "lake_id",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"body": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: dict[str, Any] | Zone,
+ zone_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.body = body
+ self.zone_id = zone_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Dataplex zone %s", self.zone_id)
+
+ try:
+ operation = hook.create_zone(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ body=self.body,
+ zone_id=self.zone_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ zone = hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+
+ except GoogleAPICallError as e:
+ raise AirflowException(f"Error occurred when creating zone
{self.zone_id}", e)
+
+ self.log.info("Dataplex zone %s created successfully!", self.zone_id)
+ return Zone.to_dict(zone)
+
+
+class DataplexDeleteZoneOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a Zone resource. All assets within a zone must be deleted before
the zone can be deleted.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param zone_id: Required. Zone identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :return: None
+ """
+
+ template_fields = (
+ "project_id",
+ "lake_id",
+ "zone_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.zone_id = zone_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Deleting Dataplex zone %s", self.zone_id)
+
+ operation = hook.delete_zone(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ zone_id=self.zone_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex zone %s deleted successfully!", self.zone_id)
+
+
+class DataplexCreateAssetOperator(GoogleCloudBaseOperator):
+ """
+ Creates an Asset resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the lake
belongs to.
+ :param zone_id: Required. Zone identifier.
+ :param asset_id: Required. Asset identifier.
+ :param body: Required. The Request body contains an instance of Asset.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :return: Asset
+ """
+
+ template_fields = (
+ "project_id",
+ "zone_id",
+ "asset_id",
+ "body",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"body": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: dict[str, Any] | Asset,
+ zone_id: str,
+ asset_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.body = body
+ self.zone_id = zone_id
+ self.asset_id = asset_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Dataplex asset %s", self.zone_id)
+ try:
+ operation = hook.create_asset(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ body=self.body,
+ zone_id=self.zone_id,
+ asset_id=self.asset_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ result = hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+ except GoogleAPICallError as e:
+ raise AirflowException(f"Error occurred when creating asset
{self.asset_id}", e)
+
+ self.log.info("Dataplex asset %s created successfully!", self.asset_id)
+ return Asset.to_dict(result)
+
+
+class DataplexDeleteAssetOperator(GoogleCloudBaseOperator):
+ """
+ Deletes an asset resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the asset
belongs to.
+ :param zone_id: Required. Zone identifier.
+ :param asset_id: Required. Asset identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = (
+ "project_id",
+ "zone_id",
+ "asset_id",
+ "impersonation_chain",
+ )
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone_id: str,
+ asset_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.zone_id = zone_id
+ self.asset_id = asset_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Deleting Dataplex asset %s", self.asset_id)
+
+ operation = hook.delete_asset(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ zone_id=self.zone_id,
+ asset_id=self.asset_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex asset %s deleted successfully!", self.asset_id)
diff --git a/airflow/providers/google/cloud/sensors/dataplex.py
b/airflow/providers/google/cloud/sensors/dataplex.py
index d710d134c3..887c7e47c6 100644
--- a/airflow/providers/google/cloud/sensors/dataplex.py
+++ b/airflow/providers/google/cloud/sensors/dataplex.py
@@ -17,16 +17,22 @@
"""This module contains Google Dataplex sensors."""
from __future__ import annotations
+import time
from typing import TYPE_CHECKING, Sequence
if TYPE_CHECKING:
from airflow.utils.context import Context
-
+from google.api_core.exceptions import GoogleAPICallError
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
+from google.cloud.dataplex_v1.types import DataScanJob
from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.hooks.dataplex import (
+ AirflowDataQualityScanException,
+ AirflowDataQualityScanResultTimeoutException,
+ DataplexHook,
+)
from airflow.sensors.base import BaseSensorOperator
@@ -114,3 +120,119 @@ class DataplexTaskStateSensor(BaseSensorOperator):
self.log.info("Current status of the Dataplex task %s => %s",
self.dataplex_task_id, task_status)
return task_status == TaskState.ACTIVE
+
+
+class DataplexDataQualityJobStatusSensor(BaseSensorOperator):
+ """
+ Check the status of the Dataplex DataQuality job.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param data_scan_id: Required. Data Quality scan identifier.
+ :param job_id: Required. Job ID.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param result_timeout: Value in seconds for which operator will wait for
the Data Quality scan result.
+ Throws exception if there is no result found after specified amount of
seconds.
+ :param fail_on_dq_failure: If set to true and not all Data Quality scan
rules have been passed,
+ an exception is thrown. If set to false and not all Data Quality scan
rules have been passed,
+ execution will finish with success.
+
+ :return: Boolean indicating if the job run has reached the
``DataScanJob.State.SUCCEEDED``.
+ """
+
+ template_fields = ["job_id"]
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ job_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ fail_on_dq_failure: bool = False,
+ result_timeout: float = 60.0 * 10,
+ start_sensor_time: float = time.monotonic(),
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.job_id = job_id
+ self.api_version = api_version
+ self.retry = retry
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.fail_on_dq_failure = fail_on_dq_failure
+ self.result_timeout = result_timeout
+ self.start_sensor_time = start_sensor_time
+
+ def execute(self, context: Context) -> None:
+ super().execute(context)
+
+ def _duration(self):
+ return time.monotonic() - self.start_sensor_time
+
+ def poke(self, context: Context) -> bool:
+ self.log.info("Waiting for job %s to be %s", self.job_id,
DataScanJob.State.SUCCEEDED)
+ if self.result_timeout:
+ duration = self._duration()
+ if duration > self.result_timeout:
+ raise AirflowDataQualityScanResultTimeoutException(
+ f"Timeout: Data Quality scan {self.job_id} is not ready
after {self.result_timeout}s"
+ )
+
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ try:
+ job = hook.get_data_scan_job(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ job_id=self.job_id,
+ timeout=self.timeout,
+ retry=self.retry,
+ metadata=self.metadata,
+ )
+ except GoogleAPICallError as e:
+ raise AirflowException(
+ f"Error occurred when trying to retrieve Data Quality scan
job: {self.data_scan_id}", e
+ )
+
+ job_status = job.state
+ self.log.info(
+ "Current status of the Dataplex Data Quality scan job %s => %s",
self.job_id, job_status
+ )
+ if job_status == DataScanJob.State.FAILED:
+ raise AirflowException(f"Data Quality scan job failed:
{self.job_id}")
+ if job_status == DataScanJob.State.CANCELLED:
+ raise AirflowException(f"Data Quality scan job cancelled:
{self.job_id}")
+ if self.fail_on_dq_failure:
+ if job_status == DataScanJob.State.SUCCEEDED and not
job.data_quality_result.passed:
+ raise AirflowDataQualityScanException(
+ f"Data Quality job {self.job_id} execution failed due to
failure of its scanning "
+ f"rules: {self.data_scan_id}"
+ )
+ return job_status == DataScanJob.State.SUCCEEDED
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index c6ed5db7da..05b96f1a1d 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -129,7 +129,6 @@ With this configuration we can create the lake:
:start-after: [START howto_dataplex_create_lake_operator]
:end-before: [END howto_dataplex_create_lake_operator]
-
Delete a lake
-------------
@@ -142,3 +141,167 @@ To delete a lake you can use:
:dedent: 4
:start-after: [START howto_dataplex_delete_lake_operator]
:end-before: [END howto_dataplex_delete_lake_operator]
+
+Create or update a Data Quality scan
+------------------------------------
+
+Before you create a Dataplex Data Quality scan you need to define its body.
+For more information about the available fields to pass when creating a Data
Quality scan, visit `Dataplex create data quality API.
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
+
+A simple Data Quality scan configuration can look as followed:
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_data_quality_configuration]
+ :end-before: [END howto_dataplex_data_quality_configuration]
+
+With this configuration we can create or update the Data Quality scan:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataQualityScanOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_create_data_quality_operator]
+ :end-before: [END howto_dataplex_create_data_quality_operator]
+
+Get a Data Quality scan
+-----------------------
+
+To get a Data Quality scan you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_get_data_quality_operator]
+ :end-before: [END howto_dataplex_get_data_quality_operator]
+
+
+
+Delete a Data Quality scan
+--------------------------
+
+To delete a Data Quality scan you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataQualityScanOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_delete_data_quality_operator]
+ :end-before: [END howto_dataplex_delete_data_quality_operator]
+
+Run a Data Quality scan
+-----------------------
+
+You can run Dataplex Data Quality scan in asynchronous modes to later check
its status using sensor:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataQualityScanOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_run_data_quality_operator]
+ :end-before: [END howto_dataplex_run_data_quality_operator]
+
+To check that running Dataplex Data Quality scan succeeded you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataQualityJobStatusSensor`.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_data_scan_job_state_sensor]
+ :end-before: [END howto_dataplex_data_scan_job_state_sensor]
+
+Get a Data Quality scan job
+---------------------------
+
+To get a Data Quality scan job you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanResultOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_get_data_quality_job_operator]
+ :end-before: [END howto_dataplex_get_data_quality_job_operator]
+
+Create a zone
+-------------
+
+Before you create a Dataplex zone you need to define its body.
+
+For more information about the available fields to pass when creating a zone,
visit `Dataplex create zone API.
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones#Zone>`__
+
+A simple zone configuration can look as followed:
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_zone_configuration]
+ :end-before: [END howto_dataplex_zone_configuration]
+
+With this configuration we can create a zone:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateZoneOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_create_zone_operator]
+ :end-before: [END howto_dataplex_create_zone_operator]
+
+Delete a zone
+-------------
+
+To delete a zone you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteZoneOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_delete_zone_operator]
+ :end-before: [END howto_dataplex_delete_zone_operator]
+
+Create a asset
+--------------
+
+Before you create a Dataplex asset you need to define its body.
+
+For more information about the available fields to pass when creating a asset,
visit `Dataplex create asset API.
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones.assets#Asset>`__
+
+A simple asset configuration can look as followed:
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_asset_configuration]
+ :end-before: [END howto_dataplex_asset_configuration]
+
+With this configuration we can create the asset:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateAssetOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_create_asset_operator]
+ :end-before: [END howto_dataplex_create_asset_operator]
+
+Delete a asset
+--------------
+
+To delete a asset you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteAssetOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_delete_asset_operator]
+ :end-before: [END howto_dataplex_delete_asset_operator]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index cd62151566..7a48585982 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -373,6 +373,8 @@ datapoint
Dataprep
Dataproc
dataproc
+DataScan
+dataScans
Dataset
dataset
datasetId
@@ -485,6 +487,7 @@ DOS'ing
DownloadReportV
downscaling
downstreams
+dq
Drillbit
Drivy
dropdown
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py
b/tests/providers/google/cloud/hooks/test_dataplex.py
index 809565d721..deca942a02 100644
--- a/tests/providers/google/cloud/hooks/test_dataplex.py
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -26,6 +26,10 @@ from tests.providers.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_
BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
DATAPLEX_STRING = "airflow.providers.google.cloud.hooks.dataplex.{}"
+DATAPLEX_HOOK_CLIENT =
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_client"
+DATAPLEX_HOOK_DS_CLIENT = (
+
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_data_scan_client"
+)
PROJECT_ID = "project-id"
REGION = "region"
@@ -36,6 +40,17 @@ DATAPLEX_TASK_ID = "testTask001"
GCP_CONN_ID = "google_cloud_default"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+DATA_SCAN_ID = "test-data-scan-id"
+ASSET_ID = "test_asset_id"
+ZONE_ID = "test_zone_id"
+JOB_ID = "job_id"
+DATA_SCAN_NAME =
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}"
+DATA_SCAN_JOB_NAME =
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}/jobs/{JOB_ID}"
+ZONE_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}"
+ZONE_PARENT =
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}"
+ASSET_PARENT =
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/assets/{ASSET_ID}"
+DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
+
class TestDataplexHook:
def test_delegate_to_runtime_error(self):
@@ -52,7 +67,7 @@ class TestDataplexHook:
impersonation_chain=IMPERSONATION_CHAIN,
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_create_task(self, mock_client):
self.hook.create_task(
project_id=PROJECT_ID,
@@ -75,7 +90,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_delete_task(self, mock_client):
self.hook.delete_task(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID
@@ -91,7 +106,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_list_tasks(self, mock_client):
self.hook.list_tasks(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID)
@@ -109,7 +124,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_get_task(self, mock_client):
self.hook.get_task(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID
@@ -125,7 +140,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_create_lake(self, mock_client):
self.hook.create_lake(
project_id=PROJECT_ID,
@@ -147,7 +162,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_delete_lake(self, mock_client):
self.hook.delete_lake(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID)
@@ -161,7 +176,7 @@ class TestDataplexHook:
metadata=(),
)
- @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
def test_get_lake(self, mock_client):
self.hook.get_lake(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID)
@@ -174,3 +189,129 @@ class TestDataplexHook:
timeout=None,
metadata=(),
)
+
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
+ def test_create_zone(self, mock_client):
+ self.hook.create_zone(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID, zone_id=ZONE_ID, body={})
+
+ mock_client.return_value.create_zone.assert_called_once_with(
+ request=dict(
+ parent=ZONE_NAME,
+ zone_id=ZONE_ID,
+ zone={},
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
+ def test_delete_zone(self, mock_client):
+ self.hook.delete_zone(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID, zone_id=ZONE_ID)
+
+ mock_client.return_value.delete_zone.assert_called_once_with(
+ request=dict(
+ name=ZONE_PARENT,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
+ def test_create_asset(self, mock_client):
+ self.hook.create_asset(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ body={},
+ )
+
+ mock_client.return_value.create_asset.assert_called_once_with(
+ request=dict(
+ parent=ZONE_PARENT,
+ asset={},
+ asset_id=ASSET_ID,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_CLIENT)
+ def test_delete_asset(self, mock_client):
+ self.hook.delete_asset(
+ project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
zone_id=ZONE_ID, asset_id=ASSET_ID
+ )
+
+ mock_client.return_value.delete_asset.assert_called_once_with(
+ request=dict(
+ name=ASSET_PARENT,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+ def test_create_data_scan(self, mock_client):
+ self.hook.create_data_scan(project_id=PROJECT_ID, region=REGION,
data_scan_id=DATA_SCAN_ID, body={})
+
+ mock_client.return_value.create_data_scan.assert_called_once_with(
+ request=dict(parent=DATASCAN_PARENT, data_scan_id=DATA_SCAN_ID,
data_scan={}),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+ def test_run_data_scan(self, mock_client):
+ self.hook.run_data_scan(project_id=PROJECT_ID, region=REGION,
data_scan_id=DATA_SCAN_ID)
+
+ mock_client.return_value.run_data_scan.assert_called_once_with(
+ request=dict(
+ name=DATA_SCAN_NAME,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+ def test_get_data_scan_job(self, mock_client):
+ self.hook.get_data_scan_job(
+ project_id=PROJECT_ID, region=REGION, job_id=JOB_ID,
data_scan_id=DATA_SCAN_ID
+ )
+
+ mock_client.return_value.get_data_scan_job.assert_called_once_with(
+ request=dict(name=DATA_SCAN_JOB_NAME, view="FULL"),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+ def test_delete_data_scan(self, mock_client):
+ self.hook.delete_data_scan(project_id=PROJECT_ID, region=REGION,
data_scan_id=DATA_SCAN_ID)
+
+ mock_client.return_value.delete_data_scan.assert_called_once_with(
+ request=dict(
+ name=DATA_SCAN_NAME,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+ def test_get_data_scan(self, mock_client):
+ self.hook.get_data_scan(project_id=PROJECT_ID, region=REGION,
data_scan_id=DATA_SCAN_ID)
+
+ mock_client.return_value.get_data_scan.assert_called_once_with(
+ request=dict(name=DATA_SCAN_NAME, view="FULL"),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py
b/tests/providers/google/cloud/operators/test_dataplex.py
index a5e4b3248c..3d5d770b56 100644
--- a/tests/providers/google/cloud/operators/test_dataplex.py
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -21,17 +21,28 @@ from unittest import mock
from google.api_core.gapic_v1.method import DEFAULT
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateAssetOperator,
DataplexCreateLakeOperator,
+ DataplexCreateOrUpdateDataQualityScanOperator,
DataplexCreateTaskOperator,
+ DataplexCreateZoneOperator,
+ DataplexDeleteAssetOperator,
+ DataplexDeleteDataQualityScanOperator,
DataplexDeleteLakeOperator,
DataplexDeleteTaskOperator,
+ DataplexDeleteZoneOperator,
+ DataplexGetDataQualityScanResultOperator,
DataplexGetTaskOperator,
DataplexListTasksOperator,
+ DataplexRunDataQualityScanOperator,
)
HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
LAKE_STR = "airflow.providers.google.cloud.operators.dataplex.Lake"
+DATASCANJOB_STR =
"airflow.providers.google.cloud.operators.dataplex.DataScanJob"
+ZONE_STR = "airflow.providers.google.cloud.operators.dataplex.Zone"
+ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset"
PROJECT_ID = "project-id"
REGION = "region"
@@ -48,6 +59,11 @@ DATAPLEX_TASK_ID = "testTask001"
GCP_CONN_ID = "google_cloud_default"
API_VERSION = "v1"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+DATA_SCAN_ID = "test-data-scan-id"
+TASK_ID = "test_task_id"
+ASSET_ID = "test_asset_id"
+ZONE_ID = "test_zone_id"
+JOB_ID = "test_job_id"
class TestDataplexCreateTaskOperator:
@@ -243,3 +259,261 @@ class TestDataplexCreateLakeOperator:
timeout=None,
metadata=(),
)
+
+
+class TestDataplexRunDataQualityScanOperator:
+ @mock.patch(HOOK_STR)
+ @mock.patch(DATASCANJOB_STR)
+ def test_execute(self, mock_data_scan_job, hook_mock):
+ op = DataplexRunDataQualityScanOperator(
+ task_id="execute_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.run_data_scan.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexGetDataQualityScanResultOperator:
+ @mock.patch(HOOK_STR)
+ @mock.patch(DATASCANJOB_STR)
+ def test_execute(self, mock_data_scan_job, hook_mock):
+ op = DataplexGetDataQualityScanResultOperator(
+ task_id="get_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_id=JOB_ID,
+ data_scan_id=DATA_SCAN_ID,
+ api_version=API_VERSION,
+ wait_for_results=False,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.get_data_scan_job.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_id=JOB_ID,
+ data_scan_id=DATA_SCAN_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCreateAssetOperator:
+ @mock.patch(HOOK_STR)
+ @mock.patch(ASSET_STR)
+ def test_execute(self, asset_mock, hook_mock):
+ op = DataplexCreateAssetOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ body={},
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ asset_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_asset.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ body={},
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCreateZoneOperator:
+ @mock.patch(HOOK_STR)
+ @mock.patch(ZONE_STR)
+ def test_execute(self, zone_mock, hook_mock):
+ op = DataplexCreateZoneOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ body={},
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ zone_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_zone.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ body={},
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexDeleteZoneOperator:
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexDeleteZoneOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_zone.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexDeleteAssetOperator:
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexDeleteAssetOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_asset.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexDeleteDataQualityScanOperator:
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexDeleteDataQualityScanOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_data_scan.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCreateDataQualityScanOperator:
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexCreateOrUpdateDataQualityScanOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ body={},
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_data_scan.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ body={},
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/sensors/test_dataplex.py
b/tests/providers/google/cloud/sensors/test_dataplex.py
index bde6a61be1..3b59fd1ac6 100644
--- a/tests/providers/google/cloud/sensors/test_dataplex.py
+++ b/tests/providers/google/cloud/sensors/test_dataplex.py
@@ -20,9 +20,14 @@ from unittest import mock
import pytest
from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.dataplex_v1.types import DataScanJob
from airflow import AirflowException
-from airflow.providers.google.cloud.sensors.dataplex import
DataplexTaskStateSensor, TaskState
+from airflow.providers.google.cloud.sensors.dataplex import (
+ DataplexDataQualityJobStatusSensor,
+ DataplexTaskStateSensor,
+ TaskState,
+)
DATAPLEX_HOOK = "airflow.providers.google.cloud.sensors.dataplex.DataplexHook"
@@ -36,6 +41,9 @@ DATAPLEX_TASK_ID = "testTask001"
GCP_CONN_ID = "google_cloud_default"
API_VERSION = "v1"
IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_JOB_ID = "test_job_id"
+TEST_DATA_SCAN_ID = "test-data-scan-id"
+TIMEOUT = 120
class TestDataplexTaskStateSensor:
@@ -99,3 +107,40 @@ class TestDataplexTaskStateSensor:
retry=DEFAULT,
metadata=(),
)
+
+
+class TestDataplexDataQualityJobStatusSensor:
+ def run_job(self, state: int):
+ job = mock.Mock()
+ job.state = state
+ return job
+
+ @mock.patch(DATAPLEX_HOOK)
+ def test_done(self, mock_hook):
+ job = self.run_job(DataScanJob.State.SUCCEEDED)
+ mock_hook.return_value.get_data_scan_job.return_value = job
+
+ sensor = DataplexDataQualityJobStatusSensor(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ job_id=TEST_JOB_ID,
+ data_scan_id=TEST_DATA_SCAN_ID,
+ region=REGION,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ timeout=TIMEOUT,
+ )
+ result = sensor.poke(context={})
+
+ mock_hook.return_value.get_data_scan_job.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_id=TEST_JOB_ID,
+ data_scan_id=TEST_DATA_SCAN_ID,
+ timeout=TIMEOUT,
+ retry=DEFAULT,
+ metadata=(),
+ )
+
+ assert result
diff --git
a/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
b/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
new file mode 100644
index 0000000000..4bcd9abbca
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
@@ -0,0 +1,343 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Dataplex Scan Data.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from google.cloud import dataplex_v1
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+ BigQueryInsertJobOperator,
+)
+from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateAssetOperator,
+ DataplexCreateLakeOperator,
+ DataplexCreateOrUpdateDataQualityScanOperator,
+ DataplexCreateZoneOperator,
+ DataplexDeleteAssetOperator,
+ DataplexDeleteDataQualityScanOperator,
+ DataplexDeleteLakeOperator,
+ DataplexDeleteZoneOperator,
+ DataplexGetDataQualityScanOperator,
+ DataplexGetDataQualityScanResultOperator,
+ DataplexRunDataQualityScanOperator,
+)
+from airflow.providers.google.cloud.sensors.dataplex import
DataplexDataQualityJobStatusSensor
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_dataplex_data_quality"
+
+LAKE_ID = f"test-lake-{ENV_ID}"
+REGION = "us-central1"
+
+DATASET_NAME = f"dataset_bq_{ENV_ID}"
+
+TABLE_1 = "table0"
+TABLE_2 = "table1"
+
+SCHEMA = [
+ {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
+ {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+ {"name": "dt", "type": "STRING", "mode": "NULLABLE"},
+]
+
+DATASET = DATASET_NAME
+INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
+INSERT_ROWS_QUERY = f"INSERT {DATASET}.{TABLE_1} VALUES (1, 'test test2',
'{INSERT_DATE}');"
+LOCATION = "us"
+
+TRIGGER_SPEC_TYPE = "ON_DEMAND"
+
+ZONE_ID = "test-zone-id"
+DATA_SCAN_ID = "test-data-scan-id"
+
+EXAMPLE_LAKE_BODY = {
+ "display_name": "test_display_name",
+ "labels": [],
+ "description": "test_description",
+ "metastore": {"service": ""},
+}
+
+# [START howto_dataplex_zone_configuration]
+EXAMPLE_ZONE = {
+ "type_": "RAW",
+ "resource_spec": {"location_type": "SINGLE_REGION"},
+}
+# [END howto_dataplex_zone_configuration]
+
+ASSET_ID = "test-asset-id"
+
+# [START howto_dataplex_asset_configuration]
+EXAMPLE_ASSET = {
+ "resource_spec": {"name":
f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"},
+ "discovery_spec": {"enabled": True},
+}
+# [END howto_dataplex_asset_configuration]
+
+# [START howto_dataplex_data_quality_configuration]
+EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
+EXAMPLE_DATA_SCAN.data.entity = (
+
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
+)
+EXAMPLE_DATA_SCAN.data.resource = (
+
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
+)
+EXAMPLE_DATA_SCAN.data_quality_spec = {
+ "rules": [
+ {
+ "range_expectation": {
+ "min_value": "0",
+ "max_value": "10000",
+ },
+ "column": "value",
+ "dimension": "VALIDITY",
+ }
+ ],
+}
+# [END howto_dataplex_data_quality_configuration]
+UPDATE_MASK = FieldMask(paths=["data_quality_spec"])
+ENTITY =
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
+EXAMPLE_DATA_SCAN_UPDATE = {
+ "data": {
+ "entity": ENTITY,
+ "resource":
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}",
+ },
+ "data_quality_spec": {
+ "rules": [
+ {
+ "range_expectation": {
+ "min_value": "1",
+ "max_value": "50000",
+ },
+ "column": "value",
+ "dimension": "VALIDITY",
+ }
+ ],
+ },
+}
+
+
+with models.DAG(
+ DAG_ID,
+ start_date=datetime(2021, 1, 1),
+ schedule="@once",
+ tags=["example", "dataplex", "data_quality"],
+) as dag:
+ create_dataset =
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset",
dataset_id=DATASET_NAME)
+ create_table_1 = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_1",
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_1,
+ schema_fields=SCHEMA,
+ location=LOCATION,
+ )
+ create_table_2 = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_2",
+ dataset_id=DATASET_NAME,
+ table_id=TABLE_2,
+ schema_fields=SCHEMA,
+ location=LOCATION,
+ )
+ insert_query_job = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration={
+ "query": {
+ "query": INSERT_ROWS_QUERY,
+ "useLegacySql": False,
+ }
+ },
+ )
+ create_lake = DataplexCreateLakeOperator(
+ task_id="create_lake", project_id=PROJECT_ID, region=REGION,
body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID
+ )
+ # [START howto_dataplex_create_zone_operator]
+ create_zone = DataplexCreateZoneOperator(
+ task_id="create_zone",
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=EXAMPLE_ZONE,
+ zone_id=ZONE_ID,
+ )
+ # [END howto_dataplex_create_zone_operator]
+ # [START howto_dataplex_create_asset_operator]
+ create_asset = DataplexCreateAssetOperator(
+ task_id="create_asset",
+ project_id=PROJECT_ID,
+ region=REGION,
+ body=EXAMPLE_ASSET,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ )
+ # [END howto_dataplex_create_asset_operator]
+ # [START howto_dataplex_create_data_quality_operator]
+ create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
+ task_id="create_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ body=EXAMPLE_DATA_SCAN,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [END howto_dataplex_create_data_quality_operator]
+ update_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
+ task_id="update_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ update_mask=UPDATE_MASK,
+ body=EXAMPLE_DATA_SCAN_UPDATE,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [START howto_dataplex_get_data_quality_operator]
+ get_data_scan = DataplexGetDataQualityScanOperator(
+ task_id="get_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [END howto_dataplex_get_data_quality_operator]
+ run_data_scan_sync = DataplexRunDataQualityScanOperator(
+ task_id="run_data_scan_sync",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ get_data_scan_job_result = DataplexGetDataQualityScanResultOperator(
+ task_id="get_data_scan_job_result",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [START howto_dataplex_run_data_quality_operator]
+ run_data_scan_async = DataplexRunDataQualityScanOperator(
+ task_id="run_data_scan_async",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ asynchronous=True,
+ )
+ # [END howto_dataplex_run_data_quality_operator]
+ # [START howto_dataplex_data_scan_job_state_sensor]
+ get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
+ task_id="get_data_scan_job_status",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
+ )
+ # [END howto_dataplex_data_scan_job_state_sensor]
+ # [START howto_dataplex_get_data_quality_job_operator]
+ get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
+ task_id="get_data_scan_job_result_2",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [END howto_dataplex_get_data_quality_job_operator]
+ # [START howto_dataplex_delete_asset_operator]
+ delete_asset = DataplexDeleteAssetOperator(
+ task_id="delete_asset",
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ asset_id=ASSET_ID,
+ )
+ # [END howto_dataplex_delete_asset_operator]
+ delete_asset.trigger_rule = TriggerRule.ALL_DONE
+ # [START howto_dataplex_delete_zone_operator]
+ delete_zone = DataplexDeleteZoneOperator(
+ task_id="delete_zone",
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ zone_id=ZONE_ID,
+ )
+ # [END howto_dataplex_delete_zone_operator]
+ delete_zone.trigger_rule = TriggerRule.ALL_DONE
+ # [START howto_dataplex_delete_data_quality_operator]
+ delete_data_scan = DataplexDeleteDataQualityScanOperator(
+ task_id="delete_data_scan",
+ project_id=PROJECT_ID,
+ region=REGION,
+ data_scan_id=DATA_SCAN_ID,
+ )
+ # [END howto_dataplex_delete_data_quality_operator]
+ delete_data_scan.trigger_rule = TriggerRule.ALL_DONE
+ delete_lake = DataplexDeleteLakeOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ task_id="delete_lake",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ delete_dataset = BigQueryDeleteDatasetOperator(
+ task_id="delete_dataset",
+ dataset_id=DATASET_NAME,
+ project_id=PROJECT_ID,
+ delete_contents=True,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ chain(
+ # TEST SETUP
+ create_dataset,
+ [create_table_1, create_table_2],
+ insert_query_job,
+ create_lake,
+ create_zone,
+ create_asset,
+ # TEST BODY
+ create_data_scan,
+ update_data_scan,
+ get_data_scan,
+ run_data_scan_sync,
+ get_data_scan_job_result,
+ run_data_scan_async,
+ get_data_scan_job_status,
+ get_data_scan_job_result_2,
+ # TEST TEARDOWN
+ delete_asset,
+ delete_zone,
+ delete_data_scan,
+ [delete_lake, delete_dataset],
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)