This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2e2d4bffd5 Dataform operators (#25587)
2e2d4bffd5 is described below
commit 2e2d4bffd53a94fda04e7d88843545f7070b6f32
Author: Ćukasz Wyszomirski <[email protected]>
AuthorDate: Tue Aug 9 17:10:07 2022 +0200
Dataform operators (#25587)
---
airflow/providers/google/cloud/hooks/dataform.py | 252 +++++++++++++
airflow/providers/google/cloud/links/dataform.py | 59 +++
.../providers/google/cloud/operators/dataform.py | 411 +++++++++++++++++++++
airflow/providers/google/cloud/sensors/dataform.py | 108 ++++++
airflow/providers/google/provider.yaml | 16 +
.../operators/cloud/dataform.rst | 110 ++++++
docs/spelling_wordlist.txt | 1 +
generated/provider_dependencies.json | 1 +
.../providers/google/cloud/hooks/test_dataform.py | 157 ++++++++
.../google/cloud/operators/test_dataform.py | 176 +++++++++
.../providers/google/cloud/dataform/__init__.py | 16 +
.../google/cloud/dataform/example_dataform.py | 173 +++++++++
12 files changed, 1480 insertions(+)
diff --git a/airflow/providers/google/cloud/hooks/dataform.py
b/airflow/providers/google/cloud/hooks/dataform.py
new file mode 100644
index 0000000000..7e666486c5
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataform.py
@@ -0,0 +1,252 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.api_core.retry import Retry
+from google.cloud.dataform_v1beta1 import DataformClient
+from google.cloud.dataform_v1beta1.types import CompilationResult,
WorkflowInvocation
+
+from airflow import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataformHook(GoogleBaseHook):
+ """Hook for Google Cloud DataForm APIs."""
+
+ def get_dataform_client(
+ self,
+ ) -> DataformClient:
+ """Retrieves client library object that allow access to Cloud Dataform
service."""
+ return DataformClient(credentials=self._get_credentials())
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def wait_for_workflow_invocation(
+ self,
+ workflow_invocation_id: str,
+ repository_id: str,
+ project_id: str,
+ region: str,
+ wait_time: int = 10,
+ timeout: Optional[int] = None,
+ ) -> None:
+ """
+ Helper method which polls a job to check if it finishes.
+
+ :param workflow_invocation_id: Id of the Workflow Invocation
+ :param repository_id: Id of the Dataform repository
+ :param project_id: Required. The ID of the Google Cloud project the
cluster belongs to.
+ :param region: Required. The Cloud Dataproc region in which to handle
the request.
+ :param wait_time: Number of seconds between checks
+ :param timeout: How many seconds wait for job to be ready. Used only
if ``asynchronous`` is False
+ """
+ if region is None:
+ raise TypeError("missing 1 required keyword argument: 'region'")
+ state = None
+ start = time.monotonic()
+ while state not in (
+ WorkflowInvocation.State.FAILED,
+ WorkflowInvocation.State.SUCCEEDED,
+ WorkflowInvocation.State.CANCELLED,
+ ):
+ if timeout and start + timeout < time.monotonic():
+ raise AirflowException(
+ f"Timeout: workflow invocation {workflow_invocation_id} is
not ready after {timeout}s"
+ )
+ time.sleep(wait_time)
+ try:
+ workflow_invocation = self.get_workflow_invocation(
+ project_id=project_id,
+ region=region,
+ repository_id=repository_id,
+ workflow_invocation_id=workflow_invocation_id,
+ )
+ state = workflow_invocation
+ except Exception as err:
+ self.log.info(
+ "Retrying. Dataform API returned error when waiting for
workflow invocation: %s", err
+ )
+
+ if state == WorkflowInvocation.State.FAILED:
+ raise AirflowException(f'Workflow Invocation
failed:\n{workflow_invocation}')
+ if state == WorkflowInvocation.State.CANCELLED:
+ raise AirflowException(f'Workflow Invocation was
cancelled:\n{workflow_invocation}')
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_compilation_result(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ compilation_result: Union[CompilationResult, Dict],
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> CompilationResult:
+ """
+ Creates a new CompilationResult in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that
the task belongs to.
+ :param compilation_result: Required. The compilation result to create.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+ """
+ client = self.get_dataform_client()
+ parent =
f"projects/{project_id}/locations/{region}/repositories/{repository_id}"
+ return client.create_compilation_result(
+ request={
+ "parent": parent,
+ "compilation_result": compilation_result,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_compilation_result(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ compilation_result_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> CompilationResult:
+ """
+ Fetches a single CompilationResult.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that
the task belongs to.
+ :param compilation_result_id: The Id of the Dataform Compilation
Result
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+ """
+ client = self.get_dataform_client()
+ name = (
+ f"projects/{project_id}/locations/{region}/repositories/"
+ f"{repository_id}/compilationResults/{compilation_result_id}"
+ )
+ return client.get_compilation_result(
+ request={"name": name}, retry=retry, timeout=timeout,
metadata=metadata
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_workflow_invocation(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation: Union[WorkflowInvocation, Dict],
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> WorkflowInvocation:
+ """
+ Creates a new WorkflowInvocation in a given Repository.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that
the task belongs to.
+ :param workflow_invocation: Required. The workflow invocation
resource to create.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+ """
+ client = self.get_dataform_client()
+ parent =
f"projects/{project_id}/locations/{region}/repositories/{repository_id}"
+ return client.create_workflow_invocation(
+ request={"parent": parent, "workflow_invocation":
workflow_invocation},
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_workflow_invocation(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> WorkflowInvocation:
+ """
+ Fetches a single WorkflowInvocation.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that
the task belongs to.
+ :param workflow_invocation_id: Required. The workflow invocation
resource's id.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+ """
+ client = self.get_dataform_client()
+ name = (
+ f"projects/{project_id}/locations/{region}/repositories/"
+ f"{repository_id}/workflowInvocations/{workflow_invocation_id}"
+ )
+ return client.get_workflow_invocation(
+ request={
+ "name": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def cancel_workflow_invocation(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ):
+ """
+ Requests cancellation of a running WorkflowInvocation.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that
the task belongs to.
+ :param workflow_invocation_id: Required. The workflow invocation
resource's id.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request
as metadata.
+ """
+ client = self.get_dataform_client()
+ name = (
+ f"projects/{project_id}/locations/{region}/repositories/"
+ f"{repository_id}/workflowInvocations/{workflow_invocation_id}"
+ )
+ client.cancel_workflow_invocation(
+ request={"name": name}, retry=retry, timeout=timeout,
metadata=metadata
+ )
diff --git a/airflow/providers/google/cloud/links/dataform.py
b/airflow/providers/google/cloud/links/dataform.py
new file mode 100644
index 0000000000..d312848185
--- /dev/null
+++ b/airflow/providers/google/cloud/links/dataform.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Dataflow links."""
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+DATAFORM_BASE_LINK = "https://pantheon.corp.google.com/bigquery/dataform"
+DATAFORM_WORKFLOW_INVOCATION_LINK = (
+ DATAFORM_BASE_LINK
+ + "/locations/{region}/repositories/{repository_id}/workflows/"
+ + "{workflow_invocation_id}?project={project_id}"
+)
+
+
+class DataformWorkflowInvocationLink(BaseGoogleLink):
+ """Helper class for constructing Dataflow Job Link"""
+
+ name = "Dataform Workflow Invocation"
+ key = "dataform_workflow_invocation_config"
+ format_str = DATAFORM_WORKFLOW_INVOCATION_LINK
+
+ @staticmethod
+ def persist(
+ operator_instance: BaseOperator,
+ context: "Context",
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ ):
+ operator_instance.xcom_push(
+ context,
+ key=DataformWorkflowInvocationLink.key,
+ value={
+ "project_id": project_id,
+ "region": region,
+ "repository_id": repository_id,
+ "workflow_invocation_id": workflow_invocation_id,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/dataform.py
b/airflow/providers/google/cloud/operators/dataform.py
new file mode 100644
index 0000000000..f06e594b6d
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataform.py
@@ -0,0 +1,411 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Dict, Optional, Sequence, Tuple, Union
+
+from airflow.providers.google.cloud.links.dataform import
DataformWorkflowInvocationLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.api_core.retry import Retry
+from google.cloud.dataform_v1beta1.types import CompilationResult,
WorkflowInvocation
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+
+
+class DataformCreateCompilationResultOperator(BaseOperator):
+ """
+ Creates a new CompilationResult in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param compilation_result: Required. The compilation result to create.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ compilation_result: Union[CompilationResult, Dict],
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.repository_id = repository_id
+ self.compilation_result = compilation_result
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context"):
+ hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ result = hook.create_compilation_result(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ compilation_result=self.compilation_result,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return CompilationResult.to_dict(result)
+
+
+class DataformGetCompilationResultOperator(BaseOperator):
+ """
+ Fetches a single CompilationResult.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param compilation_result_id: The Id of the Dataform Compilation Result
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ("repository_id", "compilation_result_id",
"delegate_to", "impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ compilation_result_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.repository_id = repository_id
+ self.compilation_result_id = compilation_result_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context"):
+ hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ result = hook.get_compilation_result(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ compilation_result_id=self.compilation_result_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return CompilationResult.to_dict(result)
+
+
+class DataformCreateWorkflowInvocationOperator(BaseOperator):
+ """
+ Creates a new WorkflowInvocation in a given Repository.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param workflow_invocation: Required. The workflow invocation resource to
create.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag to return workflow_invocation_id from the
Dataform API.
+ This is useful for submitting long running workflows and
+ waiting on them asynchronously using the
DataformWorkflowInvocationStateSensor
+ :param wait_time: Number of seconds between checks
+ """
+
+ template_fields = ("workflow_invocation", "delegate_to",
"impersonation_chain")
+ operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation: Union[WorkflowInvocation, Dict],
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[int] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ asynchronous: bool = False,
+ wait_time: int = 10,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.repository_id = repository_id
+ self.workflow_invocation = workflow_invocation
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+ self.wait_time = wait_time
+
+ def execute(self, context: "Context"):
+ hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ result = hook.create_workflow_invocation(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ workflow_invocation=self.workflow_invocation,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ workflow_invocation_id = result.name.split("/")[-1]
+ DataformWorkflowInvocationLink.persist(
+ operator_instance=self,
+ context=context,
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ workflow_invocation_id=workflow_invocation_id,
+ )
+ if not self.asynchronous:
+ hook.wait_for_workflow_invocation(
+ workflow_invocation_id=workflow_invocation_id,
+ repository_id=self.repository_id,
+ project_id=self.project_id,
+ region=self.region,
+ timeout=self.timeout,
+ wait_time=self.wait_time,
+ )
+ return WorkflowInvocation.to_dict(result)
+
+
+class DataformGetWorkflowInvocationOperator(BaseOperator):
+ """
+ Fetches a single WorkflowInvocation.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param workflow_invocation_id: the workflow invocation resource's id.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ("repository_id", "workflow_invocation_id",
"delegate_to", "impersonation_chain")
+ operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.repository_id = repository_id
+ self.workflow_invocation_id = workflow_invocation_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context"):
+ hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ result = hook.get_workflow_invocation(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ workflow_invocation_id=self.workflow_invocation_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return WorkflowInvocation.to_dict(result)
+
+
+class DataformCancelWorkflowInvocationOperator(BaseOperator):
+ """
+ Requests cancellation of a running WorkflowInvocation.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param workflow_invocation_id: the workflow invocation resource's id.
+ :param retry: Designation of what errors, if any, should be retried.
+ :param timeout: The timeout for this request.
+ :param metadata: Strings which should be sent along with the request as
metadata.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ("repository_id", "workflow_invocation_id",
"delegate_to", "impersonation_chain")
+ operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ retry: Union[Retry, _MethodDefault] = DEFAULT,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.repository_id = repository_id
+ self.workflow_invocation_id = workflow_invocation_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context"):
+ hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+ hook.cancel_workflow_invocation(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ workflow_invocation_id=self.workflow_invocation_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
diff --git a/airflow/providers/google/cloud/sensors/dataform.py
b/airflow/providers/google/cloud/sensors/dataform.py
new file mode 100644
index 0000000000..0df7350ccd
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/dataform.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Dataform sensor."""
+from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class DataformWorkflowInvocationStateSensor(BaseSensorOperator):
+ """
+ Checks for the status of a Workflow Invocation in Google Cloud Dataform.
+
+ :param project_id: Required, the Google Cloud project ID in which to start
a job.
+ If set to None or missing, the default project_id from the Google
Cloud connection is used.
+ :param region: Required, The location of the Dataform workflow invocation
(for example europe-west1).
+ :param repository_id: Required. The ID of the Dataform repository that the
task belongs to.
+ :param workflow_invocation_id: Required, ID of the workflow invocation to
be checked.
+ :param expected_statuses: The expected state of the operation.
+ See:
+
https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.WorkflowInvocation.State
+ :param failure_statuses: State that will terminate the sensor with an
exception
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled. See:
+
https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = ('workflow_invocation_id',)
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ repository_id: str,
+ workflow_invocation_id: str,
+ expected_statuses: Union[Set[int], int],
+ failure_statuses: Optional[Iterable[int]] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.repository_id = repository_id
+ self.workflow_invocation_id = workflow_invocation_id
+ self.expected_statuses = (
+ {expected_statuses} if isinstance(expected_statuses, int) else
expected_statuses
+ )
+ self.failure_statuses = failure_statuses
+ self.project_id = project_id
+ self.region = region
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+ self.hook: Optional[DataformHook] = None
+
+ def poke(self, context: 'Context') -> bool:
+ self.hook = DataformHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ workflow_invocation = self.hook.get_workflow_invocation(
+ project_id=self.project_id,
+ region=self.region,
+ repository_id=self.repository_id,
+ workflow_invocation_id=self.workflow_invocation_id,
+ )
+ workflow_status = workflow_invocation.state
+ if workflow_status is not None:
+ if self.failure_statuses and workflow_status in
self.failure_statuses:
+ raise AirflowException(
+ f"Workflow Invocation with id
'{self.workflow_invocation_id}' "
+ f"state is: {workflow_status}. Terminating sensor..."
+ )
+
+ return workflow_status in self.expected_statuses
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index c3589140aa..c4ba4de716 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -74,6 +74,7 @@ dependencies:
- google-cloud-bigtable>=1.0.0,<2.0.0
- google-cloud-build>=3.0.0
- google-cloud-container>=2.2.0,<3.0.0
+ - google-cloud-dataform>=0.2.0
- google-cloud-datacatalog>=3.0.0
- google-cloud-dataplex>=0.1.0
- google-cloud-dataproc>=3.1.0
@@ -157,6 +158,11 @@ integrations:
how-to-guide:
-
/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
tags: [google]
+ - integration-name: Google Cloud Dataform
+ external-doc-url: https://cloud.google.com/dataform/
+ how-to-guide:
+ - /docs/apache-airflow-providers-google/operators/cloud/dataform.rst
+ tags: [google]
- integration-name: Google Cloud Data Loss Prevention (DLP)
external-doc-url: https://cloud.google.com/dlp/
how-to-guide:
@@ -567,6 +573,9 @@ operators:
- integration-name: Google Looker
python-modules:
- airflow.providers.google.cloud.operators.looker
+ - integration-name: Google Cloud Dataform
+ python-modules:
+ - airflow.providers.google.cloud.operators.dataform
sensors:
- integration-name: Google BigQuery
@@ -617,6 +626,9 @@ sensors:
- integration-name: Google Looker
python-modules:
- airflow.providers.google.cloud.sensors.looker
+ - integration-name: Google Cloud Dataform
+ python-modules:
+ - airflow.providers.google.cloud.sensors.dataform
hooks:
- integration-name: Google Ads
@@ -785,6 +797,9 @@ hooks:
- integration-name: Google Looker
python-modules:
- airflow.providers.google.cloud.hooks.looker
+ - integration-name: Google Cloud Dataform
+ python-modules:
+ - airflow.providers.google.cloud.hooks.dataform
transfers:
- source-integration-name: Presto
@@ -930,6 +945,7 @@ extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
-
airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
+ -
airflow.providers.google.cloud.links.dataform.DataformWorkflowInvocationLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataform.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst
new file mode 100644
index 0000000000..de9e5871a0
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst
@@ -0,0 +1,110 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Dataform Operators
+=========================
+
+Dataform is a service for data analysts to develop, test, version control, and
schedule complex SQL
+workflows for data transformation in BigQuery.
+
+Dataform lets you manage data transformation in the Extraction, Loading, and
Transformation (ELT) process
+for data integration. After raw data is extracted from source systems and
loaded into BigQuery, Dataform
+helps you to transform it into a well-defined, tested, and documented suite of
data tables.
+
+For more information about the task visit `Dataform production documentation
<Product documentation <https://cloud.google.com/dataform/docs/reference>`__
+
+
+Configuration
+-------------
+
+Before you can use the Dataform operators you need to initialize repository
and workspace, for more information
+about this visit `Dataform Production documentation <Product documentation
<https://cloud.google.com/dataform/docs/reference>`__
+
+
+Create Compilation Result
+-------------------------
+A simple configuration to create Compilation Result can look as followed:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateCompilationResultOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_operator_create_compilation_result]
+ :end-before: [END howto_operator_create_compilation_result]
+
+Get Compilation Result
+----------------------
+
+To get a Compilation Result you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetCompilationResultOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_get_compilation_result]
+ :end-before: [END howto_operator_get_compilation_result]
+
+Create Workflow Invocation
+--------------------------
+
+To create a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateWorkflowInvocationOperator`
+
+We have possibility to run this operation in the sync mode and async, for
async operation we also have
+a sensor:
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformWorkflowInvocationStateSensor`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_create_workflow_invocation]
+ :end-before: [END howto_operator_create_workflow_invocation]
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_create_workflow_invocation_async]
+ :end-before: [END howto_operator_create_workflow_invocation_async]
+
+Get Workflow Invocation
+-----------------------
+
+To get a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetWorkflowInvocationOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_get_workflow_invocation]
+ :end-before: [END howto_operator_get_workflow_invocation]
+
+Cancel Workflow Invocation
+--------------------------
+
+To cancel a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataform.DataformCancelWorkflowInvocationOperator`
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_cancel_workflow_invocation]
+ :end-before: [END howto_operator_cancel_workflow_invocation]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index e4d2c98602..a06292dc9f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -99,6 +99,7 @@ DataTransferServiceClient
Databricks
Datadog
Dataflow
+Dataform
Dataframe
Datalake
Datanodes
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index dd06841df9..e65ee42833 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -306,6 +306,7 @@
"google-cloud-build>=3.0.0",
"google-cloud-container>=2.2.0,<3.0.0",
"google-cloud-datacatalog>=3.0.0",
+ "google-cloud-dataform>=0.2.0",
"google-cloud-dataplex>=0.1.0",
"google-cloud-dataproc-metastore>=1.2.0,<2.0.0",
"google-cloud-dataproc>=3.1.0",
diff --git a/tests/providers/google/cloud/hooks/test_dataform.py
b/tests/providers/google/cloud/hooks/test_dataform.py
new file mode 100644
index 0000000000..4312b550e6
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataform.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from unittest import mock
+
+from google.api_core.gapic_v1.method import DEFAULT
+
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+from tests.providers.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_default_project_id
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAFORM_STRING = "airflow.providers.google.cloud.hooks.dataform.{}"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+REPOSITORY_ID = "test_repository"
+WORKSPACE_ID = "test_workspace"
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+COMPILATION_RESULT = {
+ "git_commitish": "main",
+ "workspace": (
+
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}"
+ ),
+}
+COMPILATION_RESULT_ID = "test_compilation_result_id"
+WORKFLOW_INVOCATION = {
+ "compilation_result": (
+ f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+ f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}"
+ ),
+}
+WORKFLOW_INVOCATION_ID = "test_workflow_invocation_id"
+
+
+class TestDataflowHook(unittest.TestCase):
+ def setUp(self):
+ with mock.patch(
+ BASE_STRING.format("GoogleBaseHook.__init__"),
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.hook = DataformHook(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+ def test_create_compilation_result(self, mock_client):
+ self.hook.create_compilation_result(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result=COMPILATION_RESULT,
+ )
+ parent =
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}"
+
mock_client.return_value.create_compilation_result.assert_called_once_with(
+ request=dict(parent=parent, compilation_result=COMPILATION_RESULT),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAFORM_STRING.format("DataformHook.get_compilation_result"))
+ def get_compilation_result(self, mock_client):
+ self.hook.create_compilation_result(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ )
+ name = (
+ f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+ f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}"
+ )
+
mock_client.return_value.get_compilation_result.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+ def test_create_workflow_invocation(self, mock_client):
+ self.hook.create_workflow_invocation(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation=WORKFLOW_INVOCATION,
+ )
+ parent =
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}"
+
mock_client.return_value.create_workflow_invocation.assert_called_once_with(
+ request=dict(parent=parent,
workflow_invocation=WORKFLOW_INVOCATION),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+ def test_get_workflow_invocation(self, mock_client):
+ self.hook.get_workflow_invocation(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ )
+ name = (
+ f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+ f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}"
+ )
+
mock_client.return_value.get_workflow_invocation.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+ def test_cancel_workflow_invocation(self, mock_client):
+ self.hook.cancel_workflow_invocation(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ )
+ name = (
+ f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+ f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}"
+ )
+
mock_client.return_value.cancel_workflow_invocation.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataform.py
b/tests/providers/google/cloud/operators/test_dataform.py
new file mode 100644
index 0000000000..e3d4c1c22c
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataform.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase, mock
+
+from google.api_core.gapic_v1.method import DEFAULT
+
+from airflow.providers.google.cloud.operators.dataform import (
+ DataformCancelWorkflowInvocationOperator,
+ DataformCreateCompilationResultOperator,
+ DataformCreateWorkflowInvocationOperator,
+ DataformGetCompilationResultOperator,
+ DataformGetWorkflowInvocationOperator,
+)
+
+HOOK_STR = "airflow.providers.google.cloud.operators.dataform.DataformHook"
+WORKFLOW_INVOCATION_STR =
"airflow.providers.google.cloud.operators.dataform.WorkflowInvocation"
+COMPILATION_RESULT_STR =
"airflow.providers.google.cloud.operators.dataform.CompilationResult"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+REPOSITORY_ID = "test_repository_id"
+WORKSPACE_ID = "test_workspace_id"
+WORKSPACE =
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}"
+COMPILATION_RESULT_ID = "test_compilation_result_id"
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+WORKFLOW_INVOCATION = {
+ "compilation_result": (
+
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
+ f"compilationResults/{COMPILATION_RESULT_ID}"
+ )
+}
+WORKFLOW_INVOCATION_ID = "test_workflow_invocation"
+
+
+class TestDataformCreateCompilationResult(unittest.TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(COMPILATION_RESULT_STR)
+ def test_execute(self, compilation_result_mock, hook_mock):
+ compilation_result = {
+ "git_commitish": "main",
+ "workspace": WORKSPACE,
+ }
+ op = DataformCreateCompilationResultOperator(
+ task_id="create_compilation_result",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result=compilation_result,
+ )
+ compilation_result_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+
hook_mock.return_value.create_compilation_result.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result=compilation_result,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataformGetCompilationResultOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(COMPILATION_RESULT_STR)
+ def test_execute(self, compilation_result_mock, hook_mock):
+ op = DataformGetCompilationResultOperator(
+ task_id="get_compilation_result",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result_id=COMPILATION_RESULT_ID,
+ )
+ compilation_result_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.return_value.get_compilation_result.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result_id=COMPILATION_RESULT_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataformCreateWorkflowInvocationOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(WORKFLOW_INVOCATION_STR)
+ def test_execute(self, workflow_invocation_str, hook_mock):
+ op = DataformCreateWorkflowInvocationOperator(
+ task_id="create_workflow_invocation",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation=WORKFLOW_INVOCATION,
+ )
+ workflow_invocation_str.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+
+
hook_mock.return_value.create_workflow_invocation.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation=WORKFLOW_INVOCATION,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataformGetWorkflowInvocationOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(WORKFLOW_INVOCATION_STR)
+ def test_execute(self, workflow_invocation_str, hook_mock):
+ op = DataformGetWorkflowInvocationOperator(
+ task_id="get_workflow_invocation",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ )
+
+ workflow_invocation_str.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+
+ hook_mock.return_value.get_workflow_invocation.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataformCancelWorkflowInvocationOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataformCancelWorkflowInvocationOperator(
+ task_id="cancel_workflow_invocation",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ )
+ op.execute(context=mock.MagicMock())
+
hook_mock.return_value.cancel_workflow_invocation.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/system/providers/google/cloud/dataform/__init__.py
b/tests/system/providers/google/cloud/dataform/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataform/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/dataform/example_dataform.py
b/tests/system/providers/google/cloud/dataform/example_dataform.py
new file mode 100644
index 0000000000..d07d42898e
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataform/example_dataform.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataform service
+"""
+import os
+from datetime import datetime
+
+from google.cloud.dataform_v1beta1 import WorkflowInvocation
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataform import (
+ DataformCancelWorkflowInvocationOperator,
+ DataformCreateCompilationResultOperator,
+ DataformCreateWorkflowInvocationOperator,
+ DataformGetCompilationResultOperator,
+ DataformGetWorkflowInvocationOperator,
+)
+from airflow.providers.google.cloud.sensors.dataform import
DataformWorkflowInvocationStateSensor
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "test-project-id")
+
+DAG_ID = "dataform"
+
+REPOSITORY_ID = "dataform-test2"
+REGION = "us-central1"
+WORKSPACE_ID = "testing"
+
+# This DAG is not self-run we need to do some extra configuration to execute
it in automation process
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once', # Override to match your needs
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example', 'dataform'],
+) as dag:
+ # [START howto_operator_create_compilation_result]
+ create_compilation_result = DataformCreateCompilationResultOperator(
+ task_id="create_compilation_result",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result={
+ "git_commitish": "main",
+ "workspace": (
+
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
+ f"workspaces/{WORKSPACE_ID}"
+ ),
+ },
+ )
+ # [END howto_operator_create_compilation_result]
+
+ # [START howto_operator_get_compilation_result]
+ get_compilation_result = DataformGetCompilationResultOperator(
+ task_id="get_compilation_result",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ compilation_result_id=(
+ "{{
task_instance.xcom_pull('create_compilation_result')['name'].split('/')[-1] }}"
+ ),
+ )
+ # [END howto_operator_get_compilation_result]]
+
+ # [START howto_operator_create_workflow_invocation]
+ create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
+ task_id='create_workflow_invocation',
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation={
+ "compilation_result": "{{
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+ },
+ )
+ # [END howto_operator_create_workflow_invocation]
+
+ # [START howto_operator_create_workflow_invocation_async]
+ create_workflow_invocation_async =
DataformCreateWorkflowInvocationOperator(
+ task_id='create_workflow_invocation_async',
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ asynchronous=True,
+ workflow_invocation={
+ "compilation_result": "{{
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+ },
+ )
+
+ is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
+ task_id="is_workflow_invocation_done",
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=(
+ "{{
task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
+ ),
+ expected_statuses={WorkflowInvocation.State.SUCCEEDED},
+ )
+ # [END howto_operator_create_workflow_invocation_async]
+
+ # [START howto_operator_get_workflow_invocation]
+ get_workflow_invocation = DataformGetWorkflowInvocationOperator(
+ task_id='get_workflow_invocation',
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=(
+ "{{
task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
+ ),
+ )
+ # [END howto_operator_get_workflow_invocation]
+
+ create_second_workflow_invocation =
DataformCreateWorkflowInvocationOperator(
+ task_id='create_second_workflow_invocation',
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation={
+ "compilation_result": "{{
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+ },
+ )
+
+ # [START howto_operator_cancel_workflow_invocation]
+ cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
+ task_id='cancel_workflow_incoation',
+ project_id=PROJECT_ID,
+ region=REGION,
+ repository_id=REPOSITORY_ID,
+ workflow_invocation_id=(
+ "{{
task_instance.xcom_pull('create_second_workflow_invocation')['name'].split('/')[-1]
}}"
+ ),
+ )
+ # [END howto_operator_cancel_workflow_invocation]
+
+ chain(
+ create_compilation_result,
+ get_compilation_result,
+ create_workflow_invocation >> get_workflow_invocation,
+ create_workflow_invocation_async >> is_workflow_invocation_done,
+ create_second_workflow_invocation >> cancel_workflow_invocation,
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)