VladaZakharova commented on code in PR #32256:
URL: https://github.com/apache/airflow/pull/32256#discussion_r1277708443
##########
airflow/providers/google/cloud/sensors/dataplex.py:
##########
@@ -114,3 +115,81 @@ def poke(self, context: Context) -> bool:
self.log.info("Current status of the Dataplex task %s => %s",
self.dataplex_task_id, task_status)
return task_status == TaskState.ACTIVE
+
+
+class DataplexDataQualityJobStatusSensor(BaseSensorOperator):
+ """
+ Check the status of the Dataplex data scan job is SUCCEEDED.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param data_scan_id: Required. DataScan identifier.
+ :param job_id: Required. Job ID.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :return: Boolean indicating if the job run has reached the
``DataScanJob.State.SUCCEEDED``.
+ """
+
+ template_fields = ["job_id"]
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ job_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.job_id = job_id
+ self.api_version = api_version
+ self.retry = retry
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context: Context) -> bool:
+ self.log.info("Waiting for job %s to be %s", self.job_id,
DataScanJob.State.SUCCEEDED)
+
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ job = hook.get_data_scan_job(
Review Comment:
Maybe add here try/except block to cover cases when we can't retrieve
datascan job because of some error?
##########
airflow/providers/google/cloud/operators/dataplex.py:
##########
@@ -610,3 +611,694 @@ def execute(self, context: Context) -> None:
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan: Required. DataScan resource.
+ :param data_scan_id: Required. DataScan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "data_scan",
"impersonation_chain")
+ template_fields_renderers = {"data_scan": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ data_scan: dict[str, Any],
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.data_scan = data_scan
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex data scan %s", self.data_scan_id)
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ data_scan=self.data_scan,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex scan data %s created successfully!",
self.data_scan_id)
+ return self.data_scan_id
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex data scan job: %s", self.data_scan_id)
+
+ operation = hook.delete_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex data_scan %s deleted successfully!",
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag to return job_id from the Dataplex API.
Review Comment:
this parameter is not about returning job_id. It describes the way operator
waits for execution result. Please, check examples in other operators.
##########
airflow/providers/google/cloud/operators/dataplex.py:
##########
@@ -610,3 +611,694 @@ def execute(self, context: Context) -> None:
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan: Required. DataScan resource.
+ :param data_scan_id: Required. DataScan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "data_scan",
"impersonation_chain")
+ template_fields_renderers = {"data_scan": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ data_scan: dict[str, Any],
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.data_scan = data_scan
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex data scan %s", self.data_scan_id)
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ data_scan=self.data_scan,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex scan data %s created successfully!",
self.data_scan_id)
+ return self.data_scan_id
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex data scan job: %s", self.data_scan_id)
+
+ operation = hook.delete_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex data_scan %s deleted successfully!",
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag to return job_id from the Dataplex API.
+ This is useful for submitting long-running pipelines and
+ waiting on them asynchronously using the
DataplexDataQualityJobStatusSensor
+
+ :return: Dataplex data scan job id.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ asynchronous: bool = False,
+ failure_mode: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+ self.failure_mode = failure_mode
+
+ def execute(self, context: Context) -> str:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ result = hook.run_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_id = result.job.name.split("/")[-1]
+ if not self.asynchronous:
+ hook.wait_for_data_scan_job(
Review Comment:
in other operators you use wait_for_operation() method. But in this operator
you are using wait_for_data_scan_job().
Could we use wait_for_operation() method for all types of waiting for
operations?
##########
airflow/providers/google/cloud/operators/dataplex.py:
##########
@@ -610,3 +611,694 @@ def execute(self, context: Context) -> None:
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan: Required. DataScan resource.
+ :param data_scan_id: Required. DataScan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "data_scan",
"impersonation_chain")
+ template_fields_renderers = {"data_scan": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ data_scan: dict[str, Any],
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.data_scan = data_scan
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex data scan %s", self.data_scan_id)
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ data_scan=self.data_scan,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex scan data %s created successfully!",
self.data_scan_id)
+ return self.data_scan_id
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex data scan job: %s", self.data_scan_id)
+
+ operation = hook.delete_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex data_scan %s deleted successfully!",
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag to return job_id from the Dataplex API.
+ This is useful for submitting long-running pipelines and
+ waiting on them asynchronously using the
DataplexDataQualityJobStatusSensor
+
+ :return: Dataplex data scan job id.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ asynchronous: bool = False,
+ failure_mode: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+ self.failure_mode = failure_mode
+
+ def execute(self, context: Context) -> str:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ result = hook.run_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_id = result.job.name.split("/")[-1]
+ if not self.asynchronous:
+ hook.wait_for_data_scan_job(
+ job_id=job_id,
+ data_scan_id=self.data_scan_id,
+ project_id=self.project_id,
+ region=self.region,
+ )
+ return job_id
+
+
+class DataplexGetDataQualityScanResultOperator(GoogleCloudBaseOperator):
+ """
+ Gets a Data Scan Job result.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param job_id: Optional. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dict representing Data scan job.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ job_id: str | None = None,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.job_id = job_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ @staticmethod
+ def _handle_job_error(job) -> None:
+ if job.state == DataScanJob.State.FAILED:
+ raise AirflowException(f"Job failed:\n{job.name}")
+ if job.state == DataScanJob.State.CANCELLED:
+ raise AirflowException(f"Job was cancelled:\n{job.name}")
+
+ def execute(self, context: Context) -> dict:
+
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ # fetch the last job
+ if not self.job_id:
+ jobs = hook.list_data_scan_jobs(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ job_ids = [DataScanJob.to_dict(job) for job in jobs]
+ if not job_ids:
+ raise AirflowException("Create a job before.")
+ job_id = job_ids[0]["name"]
+ job = job_ids[0]
+ self.job_id = job_id.split("/")[-1]
+
+ job = hook.get_data_scan_job(
+ project_id=self.project_id,
+ region=self.region,
+ job_id=self.job_id,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ self._handle_job_error(job)
+ return MessageToDict(job._pb)
+
+
+class DataplexCreateZoneOperator(GoogleCloudBaseOperator):
+ """
+ Creates a zone resource within a lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param zone: Required. The Request body contains an instance of Task.
+ :param zone_id: Required. Task identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Zone
+ """
+
+ template_fields = (
+ "project_id",
+ "zone_id",
+ "zone",
+ "lake_id",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"zone": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ zone: dict[str, Any],
+ zone_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.zone = zone
+ self.zone_id = zone_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Dataplex zone %s", self.zone_id)
+
+ operation = hook.create_zone(
Review Comment:
Will we get error if such zone already exists? Should we cover such cases?
##########
airflow/providers/google/cloud/operators/dataplex.py:
##########
@@ -610,3 +611,694 @@ def execute(self, context: Context) -> None:
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan: Required. DataScan resource.
+ :param data_scan_id: Required. DataScan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "data_scan",
"impersonation_chain")
+ template_fields_renderers = {"data_scan": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ data_scan: dict[str, Any],
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.data_scan = data_scan
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex data scan %s", self.data_scan_id)
+ operation = hook.create_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ data_scan=self.data_scan,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex scan data %s created successfully!",
self.data_scan_id)
+ return self.data_scan_id
+
+
+class DataplexDeleteDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Deletes a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: None
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex data scan job: %s", self.data_scan_id)
+
+ operation = hook.delete_data_scan(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex data_scan %s deleted successfully!",
self.data_scan_id)
+
+
+class DataplexRunDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Runs an on-demand execution of a DataScan.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan_id: Required. Data scan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag to return job_id from the Dataplex API.
+ This is useful for submitting long-running pipelines and
+ waiting on them asynchronously using the
DataplexDataQualityJobStatusSensor
+
+ :return: Dataplex data scan job id.
+ """
+
+ template_fields = ("project_id", "data_scan_id", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ asynchronous: bool = False,
+ failure_mode: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+ self.failure_mode = failure_mode
Review Comment:
I didn't find the place in the operator where you are using this parameter
##########
airflow/providers/google/cloud/sensors/dataplex.py:
##########
@@ -114,3 +115,81 @@ def poke(self, context: Context) -> bool:
self.log.info("Current status of the Dataplex task %s => %s",
self.dataplex_task_id, task_status)
return task_status == TaskState.ACTIVE
+
+
+class DataplexDataQualityJobStatusSensor(BaseSensorOperator):
+ """
+ Check the status of the Dataplex data scan job is SUCCEEDED.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param data_scan_id: Required. DataScan identifier.
+ :param job_id: Required. Job ID.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :return: Boolean indicating if the job run has reached the
``DataScanJob.State.SUCCEEDED``.
+ """
+
+ template_fields = ["job_id"]
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ job_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.job_id = job_id
+ self.api_version = api_version
+ self.retry = retry
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context: Context) -> bool:
+ self.log.info("Waiting for job %s to be %s", self.job_id,
DataScanJob.State.SUCCEEDED)
+
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ job = hook.get_data_scan_job(
+ project_id=self.project_id,
+ region=self.region,
+ data_scan_id=self.data_scan_id,
+ job_id=self.job_id,
+ retry=self.retry,
+ metadata=self.metadata,
+ )
+ job_status = job.state
+
+ if job_status == DataScanJob.State.FAILED:
+ raise AirflowException(f"Data scan job failed: {self.job_id}")
+
+ self.log.info("Current status of the Dataplex data scan job %s => %s",
self.job_id, job_status)
+
+ return job_status == DataScanJob.State.SUCCEEDED
Review Comment:
What are the other possible DataScan job statuses? Are success and failure
the only existing?
##########
airflow/providers/google/cloud/operators/dataplex.py:
##########
@@ -610,3 +611,694 @@ def execute(self, context: Context) -> None:
DataplexLakeLink.persist(context=context, task_instance=self)
hook.wait_for_operation(timeout=self.timeout, operation=operation)
self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
+
+
+class DataplexCreateDataQualityScanOperator(GoogleCloudBaseOperator):
+ """
+ Creates a DataScan resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake
belongs to.
+ :param data_scan: Required. DataScan resource.
+ :param data_scan_id: Required. DataScan identifier.
+ :param api_version: The version of the api that will be requested for
example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+
+ :return: Dataplex data scan id
+ """
+
+ template_fields = ("project_id", "data_scan_id", "data_scan",
"impersonation_chain")
+ template_fields_renderers = {"data_scan": "json"}
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ data_scan_id: str,
+ data_scan: dict[str, Any],
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.data_scan_id = data_scan_id
+ self.data_scan = data_scan
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context):
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Creating Dataplex data scan %s", self.data_scan_id)
+ operation = hook.create_data_scan(
Review Comment:
Will we get error if data scan with specified parameter already exists?
(like 409?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]