This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dfb2403ec4 Add Dataplex Data Quality operators. (#32256)
dfb2403ec4 is described below

commit dfb2403ec4b6d147ac31125631677cee9e12347e
Author: Beata Kossakowska <[email protected]>
AuthorDate: Mon Aug 14 00:21:14 2023 +0200

    Add Dataplex Data Quality operators. (#32256)
    
    
    
    ---------
    
    Co-authored-by: Beata Kossakowska <[email protected]>
---
 airflow/providers/google/cloud/hooks/dataplex.py   | 500 +++++++++++-
 .../providers/google/cloud/operators/dataplex.py   | 859 ++++++++++++++++++++-
 airflow/providers/google/cloud/sensors/dataplex.py | 126 ++-
 .../operators/cloud/dataplex.rst                   | 165 +++-
 docs/spelling_wordlist.txt                         |   3 +
 .../providers/google/cloud/hooks/test_dataplex.py  | 155 +++-
 .../google/cloud/operators/test_dataplex.py        | 274 +++++++
 .../google/cloud/sensors/test_dataplex.py          |  47 +-
 .../google/cloud/dataplex/example_dataplex_dq.py   | 343 ++++++++
 9 files changed, 2454 insertions(+), 18 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataplex.py 
b/airflow/providers/google/cloud/hooks/dataplex.py
index 405870fd31..5fbfb0bd6e 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -17,20 +17,39 @@
 """This module contains Google Dataplex hook."""
 from __future__ import annotations
 
+import time
 from typing import Any, Sequence
 
 from google.api_core.client_options import ClientOptions
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.operation import Operation
 from google.api_core.retry import Retry
-from google.cloud.dataplex_v1 import DataplexServiceClient
-from google.cloud.dataplex_v1.types import Lake, Task
+from google.cloud.dataplex_v1 import DataplexServiceClient, 
DataScanServiceClient
+from google.cloud.dataplex_v1.types import (
+    Asset,
+    DataScan,
+    DataScanJob,
+    Lake,
+    Task,
+    Zone,
+)
+from google.protobuf.field_mask_pb2 import FieldMask
 from googleapiclient.discovery import Resource
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
 
+PATH_DATA_SCAN = 
"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
+
+
+class AirflowDataQualityScanException(AirflowException):
+    """Raised when data quality scan rules fail."""
+
+
+class AirflowDataQualityScanResultTimeoutException(AirflowException):
+    """Raised when no result found after specified amount of seconds."""
+
 
 class DataplexHook(GoogleBaseHook):
     """
@@ -55,6 +74,7 @@ class DataplexHook(GoogleBaseHook):
         api_version: str = "v1",
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        location: str | None = None,
         **kwargs,
     ) -> None:
         if kwargs.get("delegate_to") is not None:
@@ -67,6 +87,7 @@ class DataplexHook(GoogleBaseHook):
             impersonation_chain=impersonation_chain,
         )
         self.api_version = api_version
+        self.location = location
 
     def get_dataplex_client(self) -> DataplexServiceClient:
         """Returns DataplexServiceClient."""
@@ -76,6 +97,14 @@ class DataplexHook(GoogleBaseHook):
             credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
         )
 
+    def get_dataplex_data_scan_client(self) -> DataScanServiceClient:
+        """Returns DataScanServiceClient."""
+        client_options = 
ClientOptions(api_endpoint="dataplex.googleapis.com:443")
+
+        return DataScanServiceClient(
+            credentials=self.get_credentials(), client_info=CLIENT_INFO, 
client_options=client_options
+        )
+
     def wait_for_operation(self, timeout: float | None, operation: Operation):
         """Waits for long-lasting operation to complete."""
         try:
@@ -361,3 +390,470 @@ class DataplexHook(GoogleBaseHook):
             metadata=metadata,
         )
         return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_zone(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        zone_id: str,
+        body: dict[str, Any] | Zone,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Creates a zone resource within a lake.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be 
retrieved.
+        :param body: Required. The Request body contains an instance of Zone.
+        :param zone_id: Required. Zone identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_client()
+
+        name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}"
+        result = client.create_zone(
+            request={
+                "parent": name,
+                "zone": body,
+                "zone_id": zone_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_zone(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        zone_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Deletes a zone resource. All assets within a zone must be deleted 
before the zone can be deleted.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be 
retrieved.
+        :param zone_id: Required. Zone identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_client()
+
+        name = 
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}"
+        operation = client.delete_zone(
+            request={"name": name},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return operation
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_asset(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        zone_id: str,
+        asset_id: str,
+        body: dict[str, Any] | Asset,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Creates an asset resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be 
retrieved.
+        :param zone_id: Required. Zone identifier.
+        :param asset_id: Required. Asset identifier.
+        :param body: Required. The Request body contains an instance of Asset.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_client()
+
+        name = 
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}"
+        result = client.create_asset(
+            request={
+                "parent": name,
+                "asset": body,
+                "asset_id": asset_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_asset(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        asset_id: str,
+        zone_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Deletes an asset resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be 
retrieved.
+        :param zone_id: Required. Zone identifier.
+        :param asset_id: Required. Asset identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_client()
+
+        name = 
f"projects/{project_id}/locations/{region}/lakes/{lake_id}/zones/{zone_id}/assets/{asset_id}"
+        result = client.delete_asset(
+            request={"name": name},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_data_scan(
+        self,
+        project_id: str,
+        region: str,
+        body: dict[str, Any] | DataScan,
+        data_scan_id: str | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Creates a DataScan resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param body: Required. The Request body contains an instance of 
DataScan.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        parent = f"projects/{project_id}/locations/{region}"
+        result = client.create_data_scan(
+            request={
+                "parent": parent,
+                "data_scan": body,
+                "data_scan_id": data_scan_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def run_data_scan(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Runs an on-demand execution of a DataScan.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        name = PATH_DATA_SCAN.format(project_id=project_id, region=region, 
data_scan_id=data_scan_id)
+        result = client.run_data_scan(
+            request={
+                "name": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_data_scan_job(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str | None = None,
+        job_id: str | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Gets a DataScan Job resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param job_id: Required. The resource name of the DataScanJob:
+            
projects/{project_id}/locations/{region}/dataScans/{data_scan_id}/jobs/{data_scan_job_id}
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        name = 
f"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}/jobs/{job_id}"
+        result = client.get_data_scan_job(
+            request={"name": name, "view": "FULL"},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    def wait_for_data_scan_job(
+        self,
+        data_scan_id: str,
+        job_id: str | None = None,
+        project_id: str | None = None,
+        region: str | None = None,
+        wait_time: int = 10,
+        result_timeout: float | None = None,
+    ) -> Any:
+        """
+        Wait for Dataplex data scan job.
+
+        :param job_id: Required. The job_id to wait for.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param project_id: Optional. Google Cloud project ID.
+        :param wait_time: Number of seconds between checks.
+        :param result_timeout: Value in seconds for which operator will wait 
for the Data Quality scan result.
+            Throws exception if there is no result found after specified 
amount of seconds.
+        """
+        start = time.monotonic()
+        state = None
+        while state not in (
+            DataScanJob.State.CANCELLED,
+            DataScanJob.State.FAILED,
+            DataScanJob.State.SUCCEEDED,
+        ):
+            if result_timeout and start + result_timeout < time.monotonic():
+                raise AirflowDataQualityScanResultTimeoutException(
+                    f"Timeout: Data Quality scan {job_id} is not ready after 
{result_timeout}s"
+                )
+            time.sleep(wait_time)
+            try:
+                job = self.get_data_scan_job(
+                    job_id=job_id,
+                    data_scan_id=data_scan_id,
+                    project_id=project_id,
+                    region=region,
+                )
+                state = job.state
+            except Exception as err:
+                self.log.info("Retrying. Dataplex API returned error when 
waiting for job: %s", err)
+        return job
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_data_scan(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Gets a DataScan resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        name = PATH_DATA_SCAN.format(project_id=project_id, region=region, 
data_scan_id=data_scan_id)
+        result = client.get_data_scan(
+            request={"name": name, "view": "FULL"},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_data_scan(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        body: dict[str, Any] | DataScan,
+        update_mask: dict | FieldMask | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Updates a DataScan resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param body: Required. The Request body contains an instance of 
DataScan.
+        :param update_mask: Required. Mask of fields to update.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        full_scan_name = 
f"projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
+
+        if body:
+            if isinstance(body, DataScan):
+                body.name = full_scan_name
+            elif isinstance(body, dict):
+                body["name"] = full_scan_name
+            else:
+                raise AirflowException("Unable to set scan_name.")
+
+        if not update_mask:
+            update_mask = FieldMask(
+                paths=["data_quality_spec", "labels", "description", 
"display_name", "execution_spec"]
+            )
+
+        result = client.update_data_scan(
+            request={
+                "data_scan": body,
+                "update_mask": update_mask,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_data_scan(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Deletes a DataScan resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        name = PATH_DATA_SCAN.format(project_id=project_id, region=region, 
data_scan_id=data_scan_id)
+        result = client.delete_data_scan(
+            request={
+                "name": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_data_scan_jobs(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Lists DataScanJobs under the given DataScan.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
lake belongs to.
+        :param data_scan_id: Required. Data Quality scan identifier.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        client = self.get_dataplex_data_scan_client()
+
+        name = PATH_DATA_SCAN.format(project_id=project_id, region=region, 
data_scan_id=data_scan_id)
+        result = client.list_data_scan_jobs(
+            request={
+                "parent": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
diff --git a/airflow/providers/google/cloud/operators/dataplex.py 
b/airflow/providers/google/cloud/operators/dataplex.py
index 7b50523e54..2b27cb70db 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -21,15 +21,19 @@ from __future__ import annotations
 from time import sleep
 from typing import TYPE_CHECKING, Any, Sequence
 
+from airflow import AirflowException
+
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
+from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry, exponential_sleep_generator
-from google.cloud.dataplex_v1.types import Lake, Task
+from google.cloud.dataplex_v1.types import Asset, DataScan, DataScanJob, Lake, 
Task, Zone
+from google.protobuf.field_mask_pb2 import FieldMask
 from googleapiclient.errors import HttpError
 
-from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.hooks.dataplex import 
AirflowDataQualityScanException, DataplexHook
 from airflow.providers.google.cloud.links.dataplex import (
     DataplexLakeLink,
     DataplexTaskLink,
@@ -440,8 +444,7 @@ class DataplexCreateLakeOperator(GoogleCloudBaseOperator):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param asynchronous: Flag informing should the Dataplex lake be created 
asynchronously.
-        This is useful for long running creating lakes and
-        waiting on them asynchronously using the DataplexLakeSensor
+        This is useful for long-running creating lakes.
     """
 
     template_fields = (
@@ -590,7 +593,6 @@ class DataplexDeleteLakeOperator(GoogleCloudBaseOperator):
         self.impersonation_chain = impersonation_chain
 
     def execute(self, context: Context) -> None:
-
         hook = DataplexHook(
             gcp_conn_id=self.gcp_conn_id,
             api_version=self.api_version,
@@ -610,3 +612,850 @@ class DataplexDeleteLakeOperator(GoogleCloudBaseOperator):
         DataplexLakeLink.persist(context=context, task_instance=self)
         hook.wait_for_operation(timeout=self.timeout, operation=operation)
         self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateOrUpdateDataQualityScanOperator(GoogleCloudBaseOperator):
+    """
+    Creates a DataScan resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake 
belongs to.
+    :param body:  Required. The Request body contains an instance of DataScan.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param update_mask: Mask of fields to update.
+    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+
+    :return: Dataplex data scan id
+    """
+
+    template_fields = ("project_id", "data_scan_id", "body", 
"impersonation_chain")
+    template_fields_renderers = {"body": "json"}
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        body: dict[str, Any] | DataScan,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        update_mask: dict | FieldMask | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.body = body
+        self.update_mask = update_mask
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        self.log.info("Creating Dataplex Data Quality scan %s", 
self.data_scan_id)
+        try:
+            operation = hook.create_data_scan(
+                project_id=self.project_id,
+                region=self.region,
+                data_scan_id=self.data_scan_id,
+                body=self.body,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            hook.wait_for_operation(timeout=self.timeout, operation=operation)
+            self.log.info("Dataplex Data Quality scan %s created 
successfully!", self.data_scan_id)
+        except AlreadyExists:
+            self.log.info("Dataplex Data Quality scan already exists: %s", 
{self.data_scan_id})
+
+            operation = hook.update_data_scan(
+                project_id=self.project_id,
+                region=self.region,
+                data_scan_id=self.data_scan_id,
+                body=self.body,
+                update_mask=self.update_mask,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            hook.wait_for_operation(timeout=self.timeout, operation=operation)
+            self.log.info("Dataplex Data Quality scan %s updated 
successfully!", self.data_scan_id)
+        except GoogleAPICallError as e:
+            raise AirflowException(f"Error creating Data Quality scan 
{self.data_scan_id}", e)
+
+        return self.data_scan_id
+
+
+class DataplexGetDataQualityScanOperator(GoogleCloudBaseOperator):
+    """
+    Gets a DataScan resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake 
belongs to.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+
+    :return: Dataplex data scan
+    """
+
+    template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        self.log.info("Retrieving the details of Dataplex Data Quality scan 
%s", self.data_scan_id)
+        data_quality_scan = hook.get_data_scan(
+            project_id=self.project_id,
+            region=self.region,
+            data_scan_id=self.data_scan_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+        return DataScan.to_dict(data_quality_scan)
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+    """
+    Deletes a DataScan resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake 
belongs to.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+
+    :return: None
+    """
+
+    template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> None:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        self.log.info("Deleting Dataplex Data Quality Scan: %s", 
self.data_scan_id)
+
+        operation = hook.delete_data_scan(
+            project_id=self.project_id,
+            region=self.region,
+            data_scan_id=self.data_scan_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex Data Quality scan %s deleted successfully!", 
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+    """
+    Runs an on-demand execution of a DataScan.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake 
belongs to.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param asynchronous: Flag informing that the Dataplex job should be run 
asynchronously.
+        This is useful for submitting long-running jobs and
+        waiting on them asynchronously using the 
DataplexDataQualityJobStatusSensor
+    :param fail_on_dq_failure: If set to true and not all Data Quality scan 
rules have been passed,
+        an exception is thrown. If set to false and not all Data Quality scan 
rules have been passed,
+        execution will finish with success.
+    :param result_timeout: Value in seconds for which operator will wait for 
the Data Quality scan result
+        when the flag `asynchronous = False`.
+        Throws exception if there is no result found after specified amount of 
seconds.
+
+    :return: Dataplex Data Quality scan job id.
+    """
+
+    template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        asynchronous: bool = False,
+        fail_on_dq_failure: bool = False,
+        result_timeout: float = 60.0 * 10,
+        *args,
+        **kwargs,
+    ) -> None:
+
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.asynchronous = asynchronous
+        self.fail_on_dq_failure = fail_on_dq_failure
+        self.result_timeout = result_timeout
+
+    def execute(self, context: Context) -> str:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        result = hook.run_data_scan(
+            project_id=self.project_id,
+            region=self.region,
+            data_scan_id=self.data_scan_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        job_id = result.job.name.split("/")[-1]
+        if not self.asynchronous:
+            job = hook.wait_for_data_scan_job(
+                job_id=job_id,
+                data_scan_id=self.data_scan_id,
+                project_id=self.project_id,
+                region=self.region,
+                result_timeout=self.result_timeout,
+            )
+
+            if job.state == DataScanJob.State.FAILED:
+                raise AirflowException(f"Data Quality job failed: {job_id}")
+            if job.state == DataScanJob.State.SUCCEEDED:
+                if not job.data_quality_result.passed:
+                    if self.fail_on_dq_failure:
+                        raise AirflowDataQualityScanException(
+                            f"Data Quality job {job_id} execution failed due 
to failure of its scanning "
+                            f"rules: {self.data_scan_id}"
+                        )
+                else:
+                    self.log.info("Data Quality job executed successfully.")
+            else:
+                self.log.info("Data Quality job execution returned status: 
%s", job.status)
+
+        return job_id
+
+
+class DataplexGetDataQualityScanResultOperator(GoogleCloudBaseOperator):
+    """
+    Gets a Data Scan Job resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake 
belongs to.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param job_id: Optional. Data Quality scan job identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete. Note that if
+        ``retry`` is specified, the timeout applies to each individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param fail_on_dq_failure: If set to true and not all Data Quality scan 
rules have been passed,
+        an exception is thrown. If set to false and not all Data Quality scan 
rules have been passed,
+        execution will finish with success.
+    :param wait_for_result: Flag indicating whether to wait for the result of 
a job execution
+        or to return the job in its current state.
+    :param result_timeout: Value in seconds for which operator will wait for 
the Data Quality scan result
+        when the flag `wait_for_result = True`.
+        Throws exception if there is no result found after specified amount of 
seconds.
+
+    :return: Dict representing DataScanJob.
+        When the job completes with a successful status, information about the 
Data Quality result
+        is available.
+    """
+
+    template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        job_id: str | None = None,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        fail_on_dq_failure: bool = False,
+        wait_for_results: bool = True,
+        result_timeout: float = 60.0 * 10,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.job_id = job_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.fail_on_dq_failure = fail_on_dq_failure
+        self.wait_for_results = wait_for_results
+        self.result_timeout = result_timeout
+
+    def execute(self, context: Context) -> dict:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        # fetch the last job
+        if not self.job_id:
+            jobs = hook.list_data_scan_jobs(
+                project_id=self.project_id,
+                region=self.region,
+                data_scan_id=self.data_scan_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            job_ids = [DataScanJob.to_dict(job) for job in jobs]
+            if not job_ids:
+                raise AirflowException("There are no jobs, you should create 
one before.")
+            job_id = job_ids[0]["name"]
+            self.job_id = job_id.split("/")[-1]
+
+        if self.wait_for_results:
+            job = hook.wait_for_data_scan_job(
+                job_id=self.job_id,
+                data_scan_id=self.data_scan_id,
+                project_id=self.project_id,
+                region=self.region,
+                result_timeout=self.result_timeout,
+            )
+        else:
+            job = hook.get_data_scan_job(
+                project_id=self.project_id,
+                region=self.region,
+                job_id=self.job_id,
+                data_scan_id=self.data_scan_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+
+        if job.state == DataScanJob.State.SUCCEEDED:
+            if not job.data_quality_result.passed:
+                if self.fail_on_dq_failure:
+                    raise AirflowDataQualityScanException(
+                        f"Data Quality job {self.job_id} execution failed due 
to failure of its scanning "
+                        f"rules: {self.data_scan_id}"
+                    )
+            else:
+                self.log.info("Data Quality job executed successfully")
+        else:
+            self.log.info("Data Quality job execution returned status: %s", 
job.state)
+
+        result = DataScanJob.to_dict(job)
+        result["state"] = DataScanJob.State(result["state"]).name
+
+        return result
+
+
+class DataplexCreateZoneOperator(GoogleCloudBaseOperator):
+    """
+    Creates a Zone resource within a Lake.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param body:  Required. The Request body contains an instance of Zone.
+    :param zone_id: Required. Task identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+
+    :return: Zone
+    """
+
+    template_fields = (
+        "project_id",
+        "zone_id",
+        "body",
+        "lake_id",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {"body": "json"}
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: dict[str, Any] | Zone,
+        zone_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.body = body
+        self.zone_id = zone_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Dataplex zone %s", self.zone_id)
+
+        try:
+            operation = hook.create_zone(
+                project_id=self.project_id,
+                region=self.region,
+                lake_id=self.lake_id,
+                body=self.body,
+                zone_id=self.zone_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            zone = hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+
+        except GoogleAPICallError as e:
+            raise AirflowException(f"Error occurred when creating zone 
{self.zone_id}", e)
+
+        self.log.info("Dataplex zone %s created successfully!", self.zone_id)
+        return Zone.to_dict(zone)
+
+
+class DataplexDeleteZoneOperator(GoogleCloudBaseOperator):
+    """
+    Deletes a Zone resource. All assets within a zone must be deleted before 
the zone can be deleted.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param zone_id: Required. Zone identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :return: None
+    """
+
+    template_fields = (
+        "project_id",
+        "lake_id",
+        "zone_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        zone_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.zone_id = zone_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting Dataplex zone %s", self.zone_id)
+
+        operation = hook.delete_zone(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            zone_id=self.zone_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex zone %s deleted successfully!", self.zone_id)
+
+
+class DataplexCreateAssetOperator(GoogleCloudBaseOperator):
+    """
+    Creates an Asset resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the lake 
belongs to.
+    :param zone_id: Required. Zone identifier.
+    :param asset_id: Required. Asset identifier.
+    :param body:  Required. The Request body contains an instance of Asset.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :return: Asset
+    """
+
+    template_fields = (
+        "project_id",
+        "zone_id",
+        "asset_id",
+        "body",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {"body": "json"}
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: dict[str, Any] | Asset,
+        zone_id: str,
+        asset_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.body = body
+        self.zone_id = zone_id
+        self.asset_id = asset_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Dataplex asset %s", self.zone_id)
+        try:
+            operation = hook.create_asset(
+                project_id=self.project_id,
+                region=self.region,
+                lake_id=self.lake_id,
+                body=self.body,
+                zone_id=self.zone_id,
+                asset_id=self.asset_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            result = hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+        except GoogleAPICallError as e:
+            raise AirflowException(f"Error occurred when creating asset 
{self.asset_id}", e)
+
+        self.log.info("Dataplex asset %s created successfully!", self.asset_id)
+        return Asset.to_dict(result)
+
+
+class DataplexDeleteAssetOperator(GoogleCloudBaseOperator):
+    """
+    Deletes an asset resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the asset 
belongs to.
+    :param zone_id: Required. Zone identifier.
+    :param asset_id: Required. Asset identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+
+    :return: None
+    """
+
+    template_fields = (
+        "project_id",
+        "zone_id",
+        "asset_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        zone_id: str,
+        asset_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.zone_id = zone_id
+        self.asset_id = asset_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting Dataplex asset %s", self.asset_id)
+
+        operation = hook.delete_asset(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            zone_id=self.zone_id,
+            asset_id=self.asset_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex asset %s deleted successfully!", self.asset_id)
diff --git a/airflow/providers/google/cloud/sensors/dataplex.py 
b/airflow/providers/google/cloud/sensors/dataplex.py
index d710d134c3..887c7e47c6 100644
--- a/airflow/providers/google/cloud/sensors/dataplex.py
+++ b/airflow/providers/google/cloud/sensors/dataplex.py
@@ -17,16 +17,22 @@
 """This module contains Google Dataplex sensors."""
 from __future__ import annotations
 
+import time
 from typing import TYPE_CHECKING, Sequence
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
-
+from google.api_core.exceptions import GoogleAPICallError
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry
+from google.cloud.dataplex_v1.types import DataScanJob
 
 from airflow.exceptions import AirflowException
-from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.hooks.dataplex import (
+    AirflowDataQualityScanException,
+    AirflowDataQualityScanResultTimeoutException,
+    DataplexHook,
+)
 from airflow.sensors.base import BaseSensorOperator
 
 
@@ -114,3 +120,119 @@ class DataplexTaskStateSensor(BaseSensorOperator):
         self.log.info("Current status of the Dataplex task %s => %s", 
self.dataplex_task_id, task_status)
 
         return task_status == TaskState.ACTIVE
+
+
+class DataplexDataQualityJobStatusSensor(BaseSensorOperator):
+    """
+    Check the status of the Dataplex DataQuality job.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param data_scan_id: Required. Data Quality scan identifier.
+    :param job_id: Required. Job ID.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param result_timeout: Value in seconds for which operator will wait for 
the Data Quality scan result.
+        Throws exception if there is no result found after specified amount of 
seconds.
+    :param fail_on_dq_failure: If set to true and not all Data Quality scan 
rules have been passed,
+        an exception is thrown. If set to false and not all Data Quality scan 
rules have been passed,
+        execution will finish with success.
+
+    :return: Boolean indicating if the job run has reached the 
``DataScanJob.State.SUCCEEDED``.
+    """
+
+    template_fields = ["job_id"]
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        data_scan_id: str,
+        job_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        fail_on_dq_failure: bool = False,
+        result_timeout: float = 60.0 * 10,
+        start_sensor_time: float = time.monotonic(),
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.data_scan_id = data_scan_id
+        self.job_id = job_id
+        self.api_version = api_version
+        self.retry = retry
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.fail_on_dq_failure = fail_on_dq_failure
+        self.result_timeout = result_timeout
+        self.start_sensor_time = start_sensor_time
+
+    def execute(self, context: Context) -> None:
+        super().execute(context)
+
+    def _duration(self):
+        return time.monotonic() - self.start_sensor_time
+
+    def poke(self, context: Context) -> bool:
+        self.log.info("Waiting for job %s to be %s", self.job_id, 
DataScanJob.State.SUCCEEDED)
+        if self.result_timeout:
+            duration = self._duration()
+            if duration > self.result_timeout:
+                raise AirflowDataQualityScanResultTimeoutException(
+                    f"Timeout: Data Quality scan {self.job_id} is not ready 
after {self.result_timeout}s"
+                )
+
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        try:
+            job = hook.get_data_scan_job(
+                project_id=self.project_id,
+                region=self.region,
+                data_scan_id=self.data_scan_id,
+                job_id=self.job_id,
+                timeout=self.timeout,
+                retry=self.retry,
+                metadata=self.metadata,
+            )
+        except GoogleAPICallError as e:
+            raise AirflowException(
+                f"Error occurred when trying to retrieve Data Quality scan 
job: {self.data_scan_id}", e
+            )
+
+        job_status = job.state
+        self.log.info(
+            "Current status of the Dataplex Data Quality scan job %s => %s", 
self.job_id, job_status
+        )
+        if job_status == DataScanJob.State.FAILED:
+            raise AirflowException(f"Data Quality scan job failed: 
{self.job_id}")
+        if job_status == DataScanJob.State.CANCELLED:
+            raise AirflowException(f"Data Quality scan job cancelled: 
{self.job_id}")
+        if self.fail_on_dq_failure:
+            if job_status == DataScanJob.State.SUCCEEDED and not 
job.data_quality_result.passed:
+                raise AirflowDataQualityScanException(
+                    f"Data Quality job {self.job_id} execution failed due to 
failure of its scanning "
+                    f"rules: {self.data_scan_id}"
+                )
+        return job_status == DataScanJob.State.SUCCEEDED
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index c6ed5db7da..05b96f1a1d 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -129,7 +129,6 @@ With this configuration we can create the lake:
     :start-after: [START howto_dataplex_create_lake_operator]
     :end-before: [END howto_dataplex_create_lake_operator]
 
-
 Delete a lake
 -------------
 
@@ -142,3 +141,167 @@ To delete a lake you can use:
     :dedent: 4
     :start-after: [START howto_dataplex_delete_lake_operator]
     :end-before: [END howto_dataplex_delete_lake_operator]
+
+Create or update a Data Quality scan
+------------------------------------
+
+Before you create a Dataplex Data Quality scan you need to define its body.
+For more information about the available fields to pass when creating a Data 
Quality scan, visit `Dataplex create data quality API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans#DataScan>`__
+
+A simple Data Quality scan configuration can look as followed:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_data_quality_configuration]
+    :end-before: [END howto_dataplex_data_quality_configuration]
+
+With this configuration we can create or update the Data Quality scan:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateOrUpdateDataQualityScanOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_create_data_quality_operator]
+    :end-before: [END howto_dataplex_create_data_quality_operator]
+
+Get a Data Quality scan
+-----------------------
+
+To get a Data Quality scan you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_get_data_quality_operator]
+    :end-before: [END howto_dataplex_get_data_quality_operator]
+
+
+
+Delete a Data Quality scan
+--------------------------
+
+To delete a Data Quality scan you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteDataQualityScanOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_delete_data_quality_operator]
+    :end-before: [END howto_dataplex_delete_data_quality_operator]
+
+Run a Data Quality scan
+-----------------------
+
+You can run Dataplex Data Quality scan in asynchronous modes to later check 
its status using sensor:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexRunDataQualityScanOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_run_data_quality_operator]
+    :end-before: [END howto_dataplex_run_data_quality_operator]
+
+To check that running Dataplex Data Quality scan succeeded you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexDataQualityJobStatusSensor`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_data_scan_job_state_sensor]
+    :end-before: [END howto_dataplex_data_scan_job_state_sensor]
+
+Get a Data Quality scan job
+---------------------------
+
+To get a Data Quality scan job you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetDataQualityScanResultOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_get_data_quality_job_operator]
+    :end-before: [END howto_dataplex_get_data_quality_job_operator]
+
+Create a zone
+-------------
+
+Before you create a Dataplex zone you need to define its body.
+
+For more information about the available fields to pass when creating a zone, 
visit `Dataplex create zone API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones#Zone>`__
+
+A simple zone configuration can look as followed:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_zone_configuration]
+    :end-before: [END howto_dataplex_zone_configuration]
+
+With this configuration we can create a zone:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateZoneOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_create_zone_operator]
+    :end-before: [END howto_dataplex_create_zone_operator]
+
+Delete a zone
+-------------
+
+To delete a zone you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteZoneOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_delete_zone_operator]
+    :end-before: [END howto_dataplex_delete_zone_operator]
+
+Create a asset
+--------------
+
+Before you create a Dataplex asset you need to define its body.
+
+For more information about the available fields to pass when creating a asset, 
visit `Dataplex create asset API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.zones.assets#Asset>`__
+
+A simple asset configuration can look as followed:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_asset_configuration]
+    :end-before: [END howto_dataplex_asset_configuration]
+
+With this configuration we can create the asset:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateAssetOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_create_asset_operator]
+    :end-before: [END howto_dataplex_create_asset_operator]
+
+Delete a asset
+--------------
+
+To delete a asset you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteAssetOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_delete_asset_operator]
+    :end-before: [END howto_dataplex_delete_asset_operator]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index cd62151566..7a48585982 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -373,6 +373,8 @@ datapoint
 Dataprep
 Dataproc
 dataproc
+DataScan
+dataScans
 Dataset
 dataset
 datasetId
@@ -485,6 +487,7 @@ DOS'ing
 DownloadReportV
 downscaling
 downstreams
+dq
 Drillbit
 Drivy
 dropdown
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py 
b/tests/providers/google/cloud/hooks/test_dataplex.py
index 809565d721..deca942a02 100644
--- a/tests/providers/google/cloud/hooks/test_dataplex.py
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -26,6 +26,10 @@ from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_
 
 BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
 DATAPLEX_STRING = "airflow.providers.google.cloud.hooks.dataplex.{}"
+DATAPLEX_HOOK_CLIENT = 
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_client"
+DATAPLEX_HOOK_DS_CLIENT = (
+    
"airflow.providers.google.cloud.hooks.dataplex.DataplexHook.get_dataplex_data_scan_client"
+)
 
 PROJECT_ID = "project-id"
 REGION = "region"
@@ -36,6 +40,17 @@ DATAPLEX_TASK_ID = "testTask001"
 GCP_CONN_ID = "google_cloud_default"
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
 
+DATA_SCAN_ID = "test-data-scan-id"
+ASSET_ID = "test_asset_id"
+ZONE_ID = "test_zone_id"
+JOB_ID = "job_id"
+DATA_SCAN_NAME = 
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}"
+DATA_SCAN_JOB_NAME = 
f"projects/{PROJECT_ID}/locations/{REGION}/dataScans/{DATA_SCAN_ID}/jobs/{JOB_ID}"
+ZONE_NAME = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}"
+ZONE_PARENT = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}"
+ASSET_PARENT = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/assets/{ASSET_ID}"
+DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"
+
 
 class TestDataplexHook:
     def test_delegate_to_runtime_error(self):
@@ -52,7 +67,7 @@ class TestDataplexHook:
                 impersonation_chain=IMPERSONATION_CHAIN,
             )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_create_task(self, mock_client):
         self.hook.create_task(
             project_id=PROJECT_ID,
@@ -75,7 +90,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_delete_task(self, mock_client):
         self.hook.delete_task(
             project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
dataplex_task_id=DATAPLEX_TASK_ID
@@ -91,7 +106,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_list_tasks(self, mock_client):
         self.hook.list_tasks(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID)
 
@@ -109,7 +124,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_get_task(self, mock_client):
         self.hook.get_task(
             project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
dataplex_task_id=DATAPLEX_TASK_ID
@@ -125,7 +140,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_create_lake(self, mock_client):
         self.hook.create_lake(
             project_id=PROJECT_ID,
@@ -147,7 +162,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_delete_lake(self, mock_client):
         self.hook.delete_lake(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID)
 
@@ -161,7 +176,7 @@ class TestDataplexHook:
             metadata=(),
         )
 
-    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
     def test_get_lake(self, mock_client):
         self.hook.get_lake(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID)
 
@@ -174,3 +189,129 @@ class TestDataplexHook:
             timeout=None,
             metadata=(),
         )
+
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
+    def test_create_zone(self, mock_client):
+        self.hook.create_zone(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID, zone_id=ZONE_ID, body={})
+
+        mock_client.return_value.create_zone.assert_called_once_with(
+            request=dict(
+                parent=ZONE_NAME,
+                zone_id=ZONE_ID,
+                zone={},
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
+    def test_delete_zone(self, mock_client):
+        self.hook.delete_zone(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID, zone_id=ZONE_ID)
+
+        mock_client.return_value.delete_zone.assert_called_once_with(
+            request=dict(
+                name=ZONE_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
+    def test_create_asset(self, mock_client):
+        self.hook.create_asset(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            asset_id=ASSET_ID,
+            body={},
+        )
+
+        mock_client.return_value.create_asset.assert_called_once_with(
+            request=dict(
+                parent=ZONE_PARENT,
+                asset={},
+                asset_id=ASSET_ID,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_CLIENT)
+    def test_delete_asset(self, mock_client):
+        self.hook.delete_asset(
+            project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
zone_id=ZONE_ID, asset_id=ASSET_ID
+        )
+
+        mock_client.return_value.delete_asset.assert_called_once_with(
+            request=dict(
+                name=ASSET_PARENT,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+    def test_create_data_scan(self, mock_client):
+        self.hook.create_data_scan(project_id=PROJECT_ID, region=REGION, 
data_scan_id=DATA_SCAN_ID, body={})
+
+        mock_client.return_value.create_data_scan.assert_called_once_with(
+            request=dict(parent=DATASCAN_PARENT, data_scan_id=DATA_SCAN_ID, 
data_scan={}),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+    def test_run_data_scan(self, mock_client):
+        self.hook.run_data_scan(project_id=PROJECT_ID, region=REGION, 
data_scan_id=DATA_SCAN_ID)
+
+        mock_client.return_value.run_data_scan.assert_called_once_with(
+            request=dict(
+                name=DATA_SCAN_NAME,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+    def test_get_data_scan_job(self, mock_client):
+        self.hook.get_data_scan_job(
+            project_id=PROJECT_ID, region=REGION, job_id=JOB_ID, 
data_scan_id=DATA_SCAN_ID
+        )
+
+        mock_client.return_value.get_data_scan_job.assert_called_once_with(
+            request=dict(name=DATA_SCAN_JOB_NAME, view="FULL"),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+    def test_delete_data_scan(self, mock_client):
+        self.hook.delete_data_scan(project_id=PROJECT_ID, region=REGION, 
data_scan_id=DATA_SCAN_ID)
+
+        mock_client.return_value.delete_data_scan.assert_called_once_with(
+            request=dict(
+                name=DATA_SCAN_NAME,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_HOOK_DS_CLIENT)
+    def test_get_data_scan(self, mock_client):
+        self.hook.get_data_scan(project_id=PROJECT_ID, region=REGION, 
data_scan_id=DATA_SCAN_ID)
+
+        mock_client.return_value.get_data_scan.assert_called_once_with(
+            request=dict(name=DATA_SCAN_NAME, view="FULL"),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py 
b/tests/providers/google/cloud/operators/test_dataplex.py
index a5e4b3248c..3d5d770b56 100644
--- a/tests/providers/google/cloud/operators/test_dataplex.py
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -21,17 +21,28 @@ from unittest import mock
 from google.api_core.gapic_v1.method import DEFAULT
 
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateAssetOperator,
     DataplexCreateLakeOperator,
+    DataplexCreateOrUpdateDataQualityScanOperator,
     DataplexCreateTaskOperator,
+    DataplexCreateZoneOperator,
+    DataplexDeleteAssetOperator,
+    DataplexDeleteDataQualityScanOperator,
     DataplexDeleteLakeOperator,
     DataplexDeleteTaskOperator,
+    DataplexDeleteZoneOperator,
+    DataplexGetDataQualityScanResultOperator,
     DataplexGetTaskOperator,
     DataplexListTasksOperator,
+    DataplexRunDataQualityScanOperator,
 )
 
 HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
 TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
 LAKE_STR = "airflow.providers.google.cloud.operators.dataplex.Lake"
+DATASCANJOB_STR = 
"airflow.providers.google.cloud.operators.dataplex.DataScanJob"
+ZONE_STR = "airflow.providers.google.cloud.operators.dataplex.Zone"
+ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset"
 
 PROJECT_ID = "project-id"
 REGION = "region"
@@ -48,6 +59,11 @@ DATAPLEX_TASK_ID = "testTask001"
 GCP_CONN_ID = "google_cloud_default"
 API_VERSION = "v1"
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+DATA_SCAN_ID = "test-data-scan-id"
+TASK_ID = "test_task_id"
+ASSET_ID = "test_asset_id"
+ZONE_ID = "test_zone_id"
+JOB_ID = "test_job_id"
 
 
 class TestDataplexCreateTaskOperator:
@@ -243,3 +259,261 @@ class TestDataplexCreateLakeOperator:
             timeout=None,
             metadata=(),
         )
+
+
+class TestDataplexRunDataQualityScanOperator:
+    @mock.patch(HOOK_STR)
+    @mock.patch(DATASCANJOB_STR)
+    def test_execute(self, mock_data_scan_job, hook_mock):
+        op = DataplexRunDataQualityScanOperator(
+            task_id="execute_data_scan",
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.run_data_scan.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexGetDataQualityScanResultOperator:
+    @mock.patch(HOOK_STR)
+    @mock.patch(DATASCANJOB_STR)
+    def test_execute(self, mock_data_scan_job, hook_mock):
+        op = DataplexGetDataQualityScanResultOperator(
+            task_id="get_data_scan",
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_id=JOB_ID,
+            data_scan_id=DATA_SCAN_ID,
+            api_version=API_VERSION,
+            wait_for_results=False,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.get_data_scan_job.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_id=JOB_ID,
+            data_scan_id=DATA_SCAN_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCreateAssetOperator:
+    @mock.patch(HOOK_STR)
+    @mock.patch(ASSET_STR)
+    def test_execute(self, asset_mock, hook_mock):
+        op = DataplexCreateAssetOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            asset_id=ASSET_ID,
+            body={},
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        asset_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_asset.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            asset_id=ASSET_ID,
+            body={},
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCreateZoneOperator:
+    @mock.patch(HOOK_STR)
+    @mock.patch(ZONE_STR)
+    def test_execute(self, zone_mock, hook_mock):
+        op = DataplexCreateZoneOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            body={},
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        zone_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_zone.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            body={},
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexDeleteZoneOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexDeleteZoneOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_zone.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexDeleteAssetOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexDeleteAssetOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            asset_id=ASSET_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_asset.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            zone_id=ZONE_ID,
+            asset_id=ASSET_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexDeleteDataQualityScanOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexDeleteDataQualityScanOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_data_scan.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCreateDataQualityScanOperator:
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexCreateOrUpdateDataQualityScanOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            body={},
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_data_scan.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            data_scan_id=DATA_SCAN_ID,
+            body={},
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/sensors/test_dataplex.py 
b/tests/providers/google/cloud/sensors/test_dataplex.py
index bde6a61be1..3b59fd1ac6 100644
--- a/tests/providers/google/cloud/sensors/test_dataplex.py
+++ b/tests/providers/google/cloud/sensors/test_dataplex.py
@@ -20,9 +20,14 @@ from unittest import mock
 
 import pytest
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.dataplex_v1.types import DataScanJob
 
 from airflow import AirflowException
-from airflow.providers.google.cloud.sensors.dataplex import 
DataplexTaskStateSensor, TaskState
+from airflow.providers.google.cloud.sensors.dataplex import (
+    DataplexDataQualityJobStatusSensor,
+    DataplexTaskStateSensor,
+    TaskState,
+)
 
 DATAPLEX_HOOK = "airflow.providers.google.cloud.sensors.dataplex.DataplexHook"
 
@@ -36,6 +41,9 @@ DATAPLEX_TASK_ID = "testTask001"
 GCP_CONN_ID = "google_cloud_default"
 API_VERSION = "v1"
 IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_JOB_ID = "test_job_id"
+TEST_DATA_SCAN_ID = "test-data-scan-id"
+TIMEOUT = 120
 
 
 class TestDataplexTaskStateSensor:
@@ -99,3 +107,40 @@ class TestDataplexTaskStateSensor:
             retry=DEFAULT,
             metadata=(),
         )
+
+
+class TestDataplexDataQualityJobStatusSensor:
+    def run_job(self, state: int):
+        job = mock.Mock()
+        job.state = state
+        return job
+
+    @mock.patch(DATAPLEX_HOOK)
+    def test_done(self, mock_hook):
+        job = self.run_job(DataScanJob.State.SUCCEEDED)
+        mock_hook.return_value.get_data_scan_job.return_value = job
+
+        sensor = DataplexDataQualityJobStatusSensor(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            job_id=TEST_JOB_ID,
+            data_scan_id=TEST_DATA_SCAN_ID,
+            region=REGION,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+            timeout=TIMEOUT,
+        )
+        result = sensor.poke(context={})
+
+        mock_hook.return_value.get_data_scan_job.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_id=TEST_JOB_ID,
+            data_scan_id=TEST_DATA_SCAN_ID,
+            timeout=TIMEOUT,
+            retry=DEFAULT,
+            metadata=(),
+        )
+
+        assert result
diff --git 
a/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py 
b/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
new file mode 100644
index 0000000000..4bcd9abbca
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py
@@ -0,0 +1,343 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that shows how to use Dataplex Scan Data.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from google.cloud import dataplex_v1
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.bigquery import (
+    BigQueryCreateEmptyDatasetOperator,
+    BigQueryCreateEmptyTableOperator,
+    BigQueryDeleteDatasetOperator,
+    BigQueryInsertJobOperator,
+)
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateAssetOperator,
+    DataplexCreateLakeOperator,
+    DataplexCreateOrUpdateDataQualityScanOperator,
+    DataplexCreateZoneOperator,
+    DataplexDeleteAssetOperator,
+    DataplexDeleteDataQualityScanOperator,
+    DataplexDeleteLakeOperator,
+    DataplexDeleteZoneOperator,
+    DataplexGetDataQualityScanOperator,
+    DataplexGetDataQualityScanResultOperator,
+    DataplexRunDataQualityScanOperator,
+)
+from airflow.providers.google.cloud.sensors.dataplex import 
DataplexDataQualityJobStatusSensor
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_dataplex_data_quality"
+
+LAKE_ID = f"test-lake-{ENV_ID}"
+REGION = "us-central1"
+
+DATASET_NAME = f"dataset_bq_{ENV_ID}"
+
+TABLE_1 = "table0"
+TABLE_2 = "table1"
+
+SCHEMA = [
+    {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
+    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
+    {"name": "dt", "type": "STRING", "mode": "NULLABLE"},
+]
+
+DATASET = DATASET_NAME
+INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
+INSERT_ROWS_QUERY = f"INSERT {DATASET}.{TABLE_1} VALUES (1, 'test test2', 
'{INSERT_DATE}');"
+LOCATION = "us"
+
+TRIGGER_SPEC_TYPE = "ON_DEMAND"
+
+ZONE_ID = "test-zone-id"
+DATA_SCAN_ID = "test-data-scan-id"
+
+EXAMPLE_LAKE_BODY = {
+    "display_name": "test_display_name",
+    "labels": [],
+    "description": "test_description",
+    "metastore": {"service": ""},
+}
+
+# [START howto_dataplex_zone_configuration]
+EXAMPLE_ZONE = {
+    "type_": "RAW",
+    "resource_spec": {"location_type": "SINGLE_REGION"},
+}
+# [END howto_dataplex_zone_configuration]
+
+ASSET_ID = "test-asset-id"
+
+# [START howto_dataplex_asset_configuration]
+EXAMPLE_ASSET = {
+    "resource_spec": {"name": 
f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"},
+    "discovery_spec": {"enabled": True},
+}
+# [END howto_dataplex_asset_configuration]
+
+# [START howto_dataplex_data_quality_configuration]
+EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
+EXAMPLE_DATA_SCAN.data.entity = (
+    
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
+)
+EXAMPLE_DATA_SCAN.data.resource = (
+    
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
+)
+EXAMPLE_DATA_SCAN.data_quality_spec = {
+    "rules": [
+        {
+            "range_expectation": {
+                "min_value": "0",
+                "max_value": "10000",
+            },
+            "column": "value",
+            "dimension": "VALIDITY",
+        }
+    ],
+}
+# [END howto_dataplex_data_quality_configuration]
+UPDATE_MASK = FieldMask(paths=["data_quality_spec"])
+ENTITY = 
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
+EXAMPLE_DATA_SCAN_UPDATE = {
+    "data": {
+        "entity": ENTITY,
+        "resource": 
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}",
+    },
+    "data_quality_spec": {
+        "rules": [
+            {
+                "range_expectation": {
+                    "min_value": "1",
+                    "max_value": "50000",
+                },
+                "column": "value",
+                "dimension": "VALIDITY",
+            }
+        ],
+    },
+}
+
+
+with models.DAG(
+    DAG_ID,
+    start_date=datetime(2021, 1, 1),
+    schedule="@once",
+    tags=["example", "dataplex", "data_quality"],
+) as dag:
+    create_dataset = 
BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", 
dataset_id=DATASET_NAME)
+    create_table_1 = BigQueryCreateEmptyTableOperator(
+        task_id="create_table_1",
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_1,
+        schema_fields=SCHEMA,
+        location=LOCATION,
+    )
+    create_table_2 = BigQueryCreateEmptyTableOperator(
+        task_id="create_table_2",
+        dataset_id=DATASET_NAME,
+        table_id=TABLE_2,
+        schema_fields=SCHEMA,
+        location=LOCATION,
+    )
+    insert_query_job = BigQueryInsertJobOperator(
+        task_id="insert_query_job",
+        configuration={
+            "query": {
+                "query": INSERT_ROWS_QUERY,
+                "useLegacySql": False,
+            }
+        },
+    )
+    create_lake = DataplexCreateLakeOperator(
+        task_id="create_lake", project_id=PROJECT_ID, region=REGION, 
body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID
+    )
+    # [START howto_dataplex_create_zone_operator]
+    create_zone = DataplexCreateZoneOperator(
+        task_id="create_zone",
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        body=EXAMPLE_ZONE,
+        zone_id=ZONE_ID,
+    )
+    # [END howto_dataplex_create_zone_operator]
+    # [START howto_dataplex_create_asset_operator]
+    create_asset = DataplexCreateAssetOperator(
+        task_id="create_asset",
+        project_id=PROJECT_ID,
+        region=REGION,
+        body=EXAMPLE_ASSET,
+        lake_id=LAKE_ID,
+        zone_id=ZONE_ID,
+        asset_id=ASSET_ID,
+    )
+    # [END howto_dataplex_create_asset_operator]
+    # [START howto_dataplex_create_data_quality_operator]
+    create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
+        task_id="create_data_scan",
+        project_id=PROJECT_ID,
+        region=REGION,
+        body=EXAMPLE_DATA_SCAN,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [END howto_dataplex_create_data_quality_operator]
+    update_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
+        task_id="update_data_scan",
+        project_id=PROJECT_ID,
+        region=REGION,
+        update_mask=UPDATE_MASK,
+        body=EXAMPLE_DATA_SCAN_UPDATE,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [START howto_dataplex_get_data_quality_operator]
+    get_data_scan = DataplexGetDataQualityScanOperator(
+        task_id="get_data_scan",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [END howto_dataplex_get_data_quality_operator]
+    run_data_scan_sync = DataplexRunDataQualityScanOperator(
+        task_id="run_data_scan_sync",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    get_data_scan_job_result = DataplexGetDataQualityScanResultOperator(
+        task_id="get_data_scan_job_result",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [START howto_dataplex_run_data_quality_operator]
+    run_data_scan_async = DataplexRunDataQualityScanOperator(
+        task_id="run_data_scan_async",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+        asynchronous=True,
+    )
+    # [END howto_dataplex_run_data_quality_operator]
+    # [START howto_dataplex_data_scan_job_state_sensor]
+    get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
+        task_id="get_data_scan_job_status",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+        job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
+    )
+    # [END howto_dataplex_data_scan_job_state_sensor]
+    # [START howto_dataplex_get_data_quality_job_operator]
+    get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
+        task_id="get_data_scan_job_result_2",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [END howto_dataplex_get_data_quality_job_operator]
+    # [START howto_dataplex_delete_asset_operator]
+    delete_asset = DataplexDeleteAssetOperator(
+        task_id="delete_asset",
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        zone_id=ZONE_ID,
+        asset_id=ASSET_ID,
+    )
+    # [END howto_dataplex_delete_asset_operator]
+    delete_asset.trigger_rule = TriggerRule.ALL_DONE
+    # [START howto_dataplex_delete_zone_operator]
+    delete_zone = DataplexDeleteZoneOperator(
+        task_id="delete_zone",
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        zone_id=ZONE_ID,
+    )
+    # [END howto_dataplex_delete_zone_operator]
+    delete_zone.trigger_rule = TriggerRule.ALL_DONE
+    # [START howto_dataplex_delete_data_quality_operator]
+    delete_data_scan = DataplexDeleteDataQualityScanOperator(
+        task_id="delete_data_scan",
+        project_id=PROJECT_ID,
+        region=REGION,
+        data_scan_id=DATA_SCAN_ID,
+    )
+    # [END howto_dataplex_delete_data_quality_operator]
+    delete_data_scan.trigger_rule = TriggerRule.ALL_DONE
+    delete_lake = DataplexDeleteLakeOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        task_id="delete_lake",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    delete_dataset = BigQueryDeleteDatasetOperator(
+        task_id="delete_dataset",
+        dataset_id=DATASET_NAME,
+        project_id=PROJECT_ID,
+        delete_contents=True,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    chain(
+        # TEST SETUP
+        create_dataset,
+        [create_table_1, create_table_2],
+        insert_query_job,
+        create_lake,
+        create_zone,
+        create_asset,
+        # TEST BODY
+        create_data_scan,
+        update_data_scan,
+        get_data_scan,
+        run_data_scan_sync,
+        get_data_scan_job_result,
+        run_data_scan_async,
+        get_data_scan_job_status,
+        get_data_scan_job_result_2,
+        # TEST TEARDOWN
+        delete_asset,
+        delete_zone,
+        delete_data_scan,
+        [delete_lake, delete_dataset],
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to