This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e2d4bffd5 Dataform operators (#25587)
2e2d4bffd5 is described below

commit 2e2d4bffd53a94fda04e7d88843545f7070b6f32
Author: Ɓukasz Wyszomirski <[email protected]>
AuthorDate: Tue Aug 9 17:10:07 2022 +0200

    Dataform operators (#25587)
---
 airflow/providers/google/cloud/hooks/dataform.py   | 252 +++++++++++++
 airflow/providers/google/cloud/links/dataform.py   |  59 +++
 .../providers/google/cloud/operators/dataform.py   | 411 +++++++++++++++++++++
 airflow/providers/google/cloud/sensors/dataform.py | 108 ++++++
 airflow/providers/google/provider.yaml             |  16 +
 .../operators/cloud/dataform.rst                   | 110 ++++++
 docs/spelling_wordlist.txt                         |   1 +
 generated/provider_dependencies.json               |   1 +
 .../providers/google/cloud/hooks/test_dataform.py  | 157 ++++++++
 .../google/cloud/operators/test_dataform.py        | 176 +++++++++
 .../providers/google/cloud/dataform/__init__.py    |  16 +
 .../google/cloud/dataform/example_dataform.py      | 173 +++++++++
 12 files changed, 1480 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/dataform.py 
b/airflow/providers/google/cloud/hooks/dataform.py
new file mode 100644
index 0000000000..7e666486c5
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataform.py
@@ -0,0 +1,252 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.api_core.retry import Retry
+from google.cloud.dataform_v1beta1 import DataformClient
+from google.cloud.dataform_v1beta1.types import CompilationResult, 
WorkflowInvocation
+
+from airflow import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataformHook(GoogleBaseHook):
+    """Hook for Google Cloud DataForm APIs."""
+
+    def get_dataform_client(
+        self,
+    ) -> DataformClient:
+        """Retrieves client library object that allow access to Cloud Dataform 
service."""
+        return DataformClient(credentials=self._get_credentials())
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def wait_for_workflow_invocation(
+        self,
+        workflow_invocation_id: str,
+        repository_id: str,
+        project_id: str,
+        region: str,
+        wait_time: int = 10,
+        timeout: Optional[int] = None,
+    ) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param workflow_invocation_id: Id of the Workflow Invocation
+        :param repository_id: Id of the Dataform repository
+        :param project_id: Required. The ID of the Google Cloud project the 
cluster belongs to.
+        :param region: Required. The Cloud Dataproc region in which to handle 
the request.
+        :param wait_time: Number of seconds between checks
+        :param timeout: How many seconds wait for job to be ready. Used only 
if ``asynchronous`` is False
+        """
+        if region is None:
+            raise TypeError("missing 1 required keyword argument: 'region'")
+        state = None
+        start = time.monotonic()
+        while state not in (
+            WorkflowInvocation.State.FAILED,
+            WorkflowInvocation.State.SUCCEEDED,
+            WorkflowInvocation.State.CANCELLED,
+        ):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(
+                    f"Timeout: workflow invocation {workflow_invocation_id} is 
not ready after {timeout}s"
+                )
+            time.sleep(wait_time)
+            try:
+                workflow_invocation = self.get_workflow_invocation(
+                    project_id=project_id,
+                    region=region,
+                    repository_id=repository_id,
+                    workflow_invocation_id=workflow_invocation_id,
+                )
+                state = workflow_invocation
+            except Exception as err:
+                self.log.info(
+                    "Retrying. Dataform API returned error when waiting for 
workflow invocation: %s", err
+                )
+
+        if state == WorkflowInvocation.State.FAILED:
+            raise AirflowException(f'Workflow Invocation 
failed:\n{workflow_invocation}')
+        if state == WorkflowInvocation.State.CANCELLED:
+            raise AirflowException(f'Workflow Invocation was 
cancelled:\n{workflow_invocation}')
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_compilation_result(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        compilation_result: Union[CompilationResult, Dict],
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> CompilationResult:
+        """
+        Creates a new CompilationResult in a given project and location.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param repository_id: Required. The ID of the Dataform repository that 
the task belongs to.
+        :param compilation_result:  Required. The compilation result to create.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+        """
+        client = self.get_dataform_client()
+        parent = 
f"projects/{project_id}/locations/{region}/repositories/{repository_id}"
+        return client.create_compilation_result(
+            request={
+                "parent": parent,
+                "compilation_result": compilation_result,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_compilation_result(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        compilation_result_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> CompilationResult:
+        """
+        Fetches a single CompilationResult.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param repository_id: Required. The ID of the Dataform repository that 
the task belongs to.
+        :param compilation_result_id:  The Id of the Dataform Compilation 
Result
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+        """
+        client = self.get_dataform_client()
+        name = (
+            f"projects/{project_id}/locations/{region}/repositories/"
+            f"{repository_id}/compilationResults/{compilation_result_id}"
+        )
+        return client.get_compilation_result(
+            request={"name": name}, retry=retry, timeout=timeout, 
metadata=metadata
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_workflow_invocation(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation: Union[WorkflowInvocation, Dict],
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> WorkflowInvocation:
+        """
+        Creates a new WorkflowInvocation in a given Repository.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param repository_id: Required. The ID of the Dataform repository that 
the task belongs to.
+        :param workflow_invocation:  Required. The workflow invocation 
resource to create.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+        """
+        client = self.get_dataform_client()
+        parent = 
f"projects/{project_id}/locations/{region}/repositories/{repository_id}"
+        return client.create_workflow_invocation(
+            request={"parent": parent, "workflow_invocation": 
workflow_invocation},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_workflow_invocation(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> WorkflowInvocation:
+        """
+        Fetches a single WorkflowInvocation.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param repository_id: Required. The ID of the Dataform repository that 
the task belongs to.
+        :param workflow_invocation_id:  Required. The workflow invocation 
resource's id.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+        """
+        client = self.get_dataform_client()
+        name = (
+            f"projects/{project_id}/locations/{region}/repositories/"
+            f"{repository_id}/workflowInvocations/{workflow_invocation_id}"
+        )
+        return client.get_workflow_invocation(
+            request={
+                "name": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def cancel_workflow_invocation(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ):
+        """
+        Requests cancellation of a running WorkflowInvocation.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param repository_id: Required. The ID of the Dataform repository that 
the task belongs to.
+        :param workflow_invocation_id:  Required. The workflow invocation 
resource's id.
+        :param retry: Designation of what errors, if any, should be retried.
+        :param timeout: The timeout for this request.
+        :param metadata: Strings which should be sent along with the request 
as metadata.
+        """
+        client = self.get_dataform_client()
+        name = (
+            f"projects/{project_id}/locations/{region}/repositories/"
+            f"{repository_id}/workflowInvocations/{workflow_invocation_id}"
+        )
+        client.cancel_workflow_invocation(
+            request={"name": name}, retry=retry, timeout=timeout, 
metadata=metadata
+        )
diff --git a/airflow/providers/google/cloud/links/dataform.py 
b/airflow/providers/google/cloud/links/dataform.py
new file mode 100644
index 0000000000..d312848185
--- /dev/null
+++ b/airflow/providers/google/cloud/links/dataform.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Dataflow links."""
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+DATAFORM_BASE_LINK = "https://pantheon.corp.google.com/bigquery/dataform";
+DATAFORM_WORKFLOW_INVOCATION_LINK = (
+    DATAFORM_BASE_LINK
+    + "/locations/{region}/repositories/{repository_id}/workflows/"
+    + "{workflow_invocation_id}?project={project_id}"
+)
+
+
+class DataformWorkflowInvocationLink(BaseGoogleLink):
+    """Helper class for constructing Dataflow Job Link"""
+
+    name = "Dataform Workflow Invocation"
+    key = "dataform_workflow_invocation_config"
+    format_str = DATAFORM_WORKFLOW_INVOCATION_LINK
+
+    @staticmethod
+    def persist(
+        operator_instance: BaseOperator,
+        context: "Context",
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+    ):
+        operator_instance.xcom_push(
+            context,
+            key=DataformWorkflowInvocationLink.key,
+            value={
+                "project_id": project_id,
+                "region": region,
+                "repository_id": repository_id,
+                "workflow_invocation_id": workflow_invocation_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/dataform.py 
b/airflow/providers/google/cloud/operators/dataform.py
new file mode 100644
index 0000000000..f06e594b6d
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataform.py
@@ -0,0 +1,411 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import TYPE_CHECKING, Dict, Optional, Sequence, Tuple, Union
+
+from airflow.providers.google.cloud.links.dataform import 
DataformWorkflowInvocationLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.api_core.retry import Retry
+from google.cloud.dataform_v1beta1.types import CompilationResult, 
WorkflowInvocation
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+
+
+class DataformCreateCompilationResultOperator(BaseOperator):
+    """
+    Creates a new CompilationResult in a given project and location.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param compilation_result:  Required. The compilation result to create.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        compilation_result: Union[CompilationResult, Dict],
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.repository_id = repository_id
+        self.compilation_result = compilation_result
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context"):
+        hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+        result = hook.create_compilation_result(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            compilation_result=self.compilation_result,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return CompilationResult.to_dict(result)
+
+
+class DataformGetCompilationResultOperator(BaseOperator):
+    """
+    Fetches a single CompilationResult.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param compilation_result_id:  The Id of the Dataform Compilation Result
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("repository_id", "compilation_result_id", 
"delegate_to", "impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        compilation_result_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.repository_id = repository_id
+        self.compilation_result_id = compilation_result_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context"):
+        hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+        result = hook.get_compilation_result(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            compilation_result_id=self.compilation_result_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return CompilationResult.to_dict(result)
+
+
+class DataformCreateWorkflowInvocationOperator(BaseOperator):
+    """
+    Creates a new WorkflowInvocation in a given Repository.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param workflow_invocation:  Required. The workflow invocation resource to 
create.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param asynchronous: Flag to return workflow_invocation_id from the 
Dataform API.
+        This is useful for submitting long running workflows and
+        waiting on them asynchronously using the 
DataformWorkflowInvocationStateSensor
+    :param wait_time: Number of seconds between checks
+    """
+
+    template_fields = ("workflow_invocation", "delegate_to", 
"impersonation_chain")
+    operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation: Union[WorkflowInvocation, Dict],
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[int] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        asynchronous: bool = False,
+        wait_time: int = 10,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.repository_id = repository_id
+        self.workflow_invocation = workflow_invocation
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.asynchronous = asynchronous
+        self.wait_time = wait_time
+
+    def execute(self, context: "Context"):
+        hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+        result = hook.create_workflow_invocation(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            workflow_invocation=self.workflow_invocation,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        workflow_invocation_id = result.name.split("/")[-1]
+        DataformWorkflowInvocationLink.persist(
+            operator_instance=self,
+            context=context,
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            workflow_invocation_id=workflow_invocation_id,
+        )
+        if not self.asynchronous:
+            hook.wait_for_workflow_invocation(
+                workflow_invocation_id=workflow_invocation_id,
+                repository_id=self.repository_id,
+                project_id=self.project_id,
+                region=self.region,
+                timeout=self.timeout,
+                wait_time=self.wait_time,
+            )
+        return WorkflowInvocation.to_dict(result)
+
+
+class DataformGetWorkflowInvocationOperator(BaseOperator):
+    """
+    Fetches a single WorkflowInvocation.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param workflow_invocation_id:  the workflow invocation resource's id.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("repository_id", "workflow_invocation_id", 
"delegate_to", "impersonation_chain")
+    operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.repository_id = repository_id
+        self.workflow_invocation_id = workflow_invocation_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context"):
+        hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+        result = hook.get_workflow_invocation(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            workflow_invocation_id=self.workflow_invocation_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return WorkflowInvocation.to_dict(result)
+
+
+class DataformCancelWorkflowInvocationOperator(BaseOperator):
+    """
+    Requests cancellation of a running WorkflowInvocation.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param workflow_invocation_id:  the workflow invocation resource's id.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as 
metadata.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("repository_id", "workflow_invocation_id", 
"delegate_to", "impersonation_chain")
+    operator_extra_links = (DataformWorkflowInvocationLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+        retry: Union[Retry, _MethodDefault] = DEFAULT,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.repository_id = repository_id
+        self.workflow_invocation_id = workflow_invocation_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context"):
+        hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+        hook.cancel_workflow_invocation(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            workflow_invocation_id=self.workflow_invocation_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
diff --git a/airflow/providers/google/cloud/sensors/dataform.py 
b/airflow/providers/google/cloud/sensors/dataform.py
new file mode 100644
index 0000000000..0df7350ccd
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/dataform.py
@@ -0,0 +1,108 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Cloud Dataform sensor."""
+from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class DataformWorkflowInvocationStateSensor(BaseSensorOperator):
+    """
+    Checks for the status of a Workflow Invocation in Google Cloud Dataform.
+
+    :param project_id: Required, the Google Cloud project ID in which to start 
a job.
+        If set to None or missing, the default project_id from the Google 
Cloud connection is used.
+    :param region: Required, The location of the Dataform workflow invocation 
(for example europe-west1).
+    :param repository_id: Required. The ID of the Dataform repository that the 
task belongs to.
+    :param workflow_invocation_id: Required, ID of the workflow invocation to 
be checked.
+    :param expected_statuses: The expected state of the operation.
+        See:
+        
https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.WorkflowInvocation.State
+    :param failure_statuses: State that will terminate the sensor with an 
exception
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled. See:
+        
https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = ('workflow_invocation_id',)
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        repository_id: str,
+        workflow_invocation_id: str,
+        expected_statuses: Union[Set[int], int],
+        failure_statuses: Optional[Iterable[int]] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.repository_id = repository_id
+        self.workflow_invocation_id = workflow_invocation_id
+        self.expected_statuses = (
+            {expected_statuses} if isinstance(expected_statuses, int) else 
expected_statuses
+        )
+        self.failure_statuses = failure_statuses
+        self.project_id = project_id
+        self.region = region
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.hook: Optional[DataformHook] = None
+
+    def poke(self, context: 'Context') -> bool:
+        self.hook = DataformHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        workflow_invocation = self.hook.get_workflow_invocation(
+            project_id=self.project_id,
+            region=self.region,
+            repository_id=self.repository_id,
+            workflow_invocation_id=self.workflow_invocation_id,
+        )
+        workflow_status = workflow_invocation.state
+        if workflow_status is not None:
+            if self.failure_statuses and workflow_status in 
self.failure_statuses:
+                raise AirflowException(
+                    f"Workflow Invocation with id 
'{self.workflow_invocation_id}' "
+                    f"state is: {workflow_status}. Terminating sensor..."
+                )
+
+        return workflow_status in self.expected_statuses
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index c3589140aa..c4ba4de716 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -74,6 +74,7 @@ dependencies:
   - google-cloud-bigtable>=1.0.0,<2.0.0
   - google-cloud-build>=3.0.0
   - google-cloud-container>=2.2.0,<3.0.0
+  - google-cloud-dataform>=0.2.0
   - google-cloud-datacatalog>=3.0.0
   - google-cloud-dataplex>=0.1.0
   - google-cloud-dataproc>=3.1.0
@@ -157,6 +158,11 @@ integrations:
     how-to-guide:
       - 
/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
     tags: [google]
+  - integration-name: Google Cloud Dataform
+    external-doc-url: https://cloud.google.com/dataform/
+    how-to-guide:
+      - /docs/apache-airflow-providers-google/operators/cloud/dataform.rst
+    tags: [google]
   - integration-name: Google Cloud Data Loss Prevention (DLP)
     external-doc-url: https://cloud.google.com/dlp/
     how-to-guide:
@@ -567,6 +573,9 @@ operators:
   - integration-name: Google Looker
     python-modules:
       - airflow.providers.google.cloud.operators.looker
+  - integration-name: Google Cloud Dataform
+    python-modules:
+      - airflow.providers.google.cloud.operators.dataform
 
 sensors:
   - integration-name: Google BigQuery
@@ -617,6 +626,9 @@ sensors:
   - integration-name: Google Looker
     python-modules:
       - airflow.providers.google.cloud.sensors.looker
+  - integration-name: Google Cloud Dataform
+    python-modules:
+      - airflow.providers.google.cloud.sensors.dataform
 
 hooks:
   - integration-name: Google Ads
@@ -785,6 +797,9 @@ hooks:
   - integration-name: Google Looker
     python-modules:
       - airflow.providers.google.cloud.hooks.looker
+  - integration-name: Google Cloud Dataform
+    python-modules:
+      - airflow.providers.google.cloud.hooks.dataform
 
 transfers:
   - source-integration-name: Presto
@@ -930,6 +945,7 @@ extra-links:
   - airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
   - 
airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
   - airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
+  - 
airflow.providers.google.cloud.links.dataform.DataformWorkflowInvocationLink
   - airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
   - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
   - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataform.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst
new file mode 100644
index 0000000000..de9e5871a0
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataform.rst
@@ -0,0 +1,110 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Dataform Operators
+=========================
+
+Dataform is a service for data analysts to develop, test, version control, and 
schedule complex SQL
+workflows for data transformation in BigQuery.
+
+Dataform lets you manage data transformation in the Extraction, Loading, and 
Transformation (ELT) process
+for data integration. After raw data is extracted from source systems and 
loaded into BigQuery, Dataform
+helps you to transform it into a well-defined, tested, and documented suite of 
data tables.
+
+For more information about the task visit `Dataform production documentation 
<Product documentation <https://cloud.google.com/dataform/docs/reference>`__
+
+
+Configuration
+-------------
+
+Before you can use the Dataform operators you need to initialize repository 
and workspace, for more information
+about this visit `Dataform Production documentation <Product documentation 
<https://cloud.google.com/dataform/docs/reference>`__
+
+
+Create Compilation Result
+-------------------------
+A simple configuration to create Compilation Result can look as followed:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateCompilationResultOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_create_compilation_result]
+    :end-before: [END howto_operator_create_compilation_result]
+
+Get Compilation Result
+----------------------
+
+To get a Compilation Result you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetCompilationResultOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_get_compilation_result]
+    :end-before: [END howto_operator_get_compilation_result]
+
+Create Workflow Invocation
+--------------------------
+
+To create a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformCreateWorkflowInvocationOperator`
+
+We have possibility to run this operation in the sync mode and async, for 
async operation we also have
+a sensor:
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformWorkflowInvocationStateSensor`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_workflow_invocation]
+    :end-before: [END howto_operator_create_workflow_invocation]
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_create_workflow_invocation_async]
+    :end-before: [END howto_operator_create_workflow_invocation_async]
+
+Get Workflow Invocation
+-----------------------
+
+To get a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataform.DataformGetWorkflowInvocationOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_get_workflow_invocation]
+    :end-before: [END howto_operator_get_workflow_invocation]
+
+Cancel Workflow Invocation
+--------------------------
+
+To cancel a Workflow Invocation you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataform.DataformCancelWorkflowInvocationOperator`
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataform/example_dataform.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cancel_workflow_invocation]
+    :end-before: [END howto_operator_cancel_workflow_invocation]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index e4d2c98602..a06292dc9f 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -99,6 +99,7 @@ DataTransferServiceClient
 Databricks
 Datadog
 Dataflow
+Dataform
 Dataframe
 Datalake
 Datanodes
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index dd06841df9..e65ee42833 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -306,6 +306,7 @@
       "google-cloud-build>=3.0.0",
       "google-cloud-container>=2.2.0,<3.0.0",
       "google-cloud-datacatalog>=3.0.0",
+      "google-cloud-dataform>=0.2.0",
       "google-cloud-dataplex>=0.1.0",
       "google-cloud-dataproc-metastore>=1.2.0,<2.0.0",
       "google-cloud-dataproc>=3.1.0",
diff --git a/tests/providers/google/cloud/hooks/test_dataform.py 
b/tests/providers/google/cloud/hooks/test_dataform.py
new file mode 100644
index 0000000000..4312b550e6
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataform.py
@@ -0,0 +1,157 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import unittest
+from unittest import mock
+
+from google.api_core.gapic_v1.method import DEFAULT
+
+from airflow.providers.google.cloud.hooks.dataform import DataformHook
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAFORM_STRING = "airflow.providers.google.cloud.hooks.dataform.{}"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+REPOSITORY_ID = "test_repository"
+WORKSPACE_ID = "test_workspace"
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+COMPILATION_RESULT = {
+    "git_commitish": "main",
+    "workspace": (
+        
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}"
+    ),
+}
+COMPILATION_RESULT_ID = "test_compilation_result_id"
+WORKFLOW_INVOCATION = {
+    "compilation_result": (
+        f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+        f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}"
+    ),
+}
+WORKFLOW_INVOCATION_ID = "test_workflow_invocation_id"
+
+
+class TestDataflowHook(unittest.TestCase):
+    def setUp(self):
+        with mock.patch(
+            BASE_STRING.format("GoogleBaseHook.__init__"),
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = DataformHook(
+                gcp_conn_id=GCP_CONN_ID,
+                delegate_to=DELEGATE_TO,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
+
+    @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+    def test_create_compilation_result(self, mock_client):
+        self.hook.create_compilation_result(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            compilation_result=COMPILATION_RESULT,
+        )
+        parent = 
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}"
+        
mock_client.return_value.create_compilation_result.assert_called_once_with(
+            request=dict(parent=parent, compilation_result=COMPILATION_RESULT),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAFORM_STRING.format("DataformHook.get_compilation_result"))
+    def get_compilation_result(self, mock_client):
+        self.hook.create_compilation_result(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+        )
+        name = (
+            f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+            f"{REPOSITORY_ID}/compilationResults/{COMPILATION_RESULT_ID}"
+        )
+        
mock_client.return_value.get_compilation_result.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+    def test_create_workflow_invocation(self, mock_client):
+        self.hook.create_workflow_invocation(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation=WORKFLOW_INVOCATION,
+        )
+        parent = 
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}"
+        
mock_client.return_value.create_workflow_invocation.assert_called_once_with(
+            request=dict(parent=parent, 
workflow_invocation=WORKFLOW_INVOCATION),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+    def test_get_workflow_invocation(self, mock_client):
+        self.hook.get_workflow_invocation(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+        )
+        name = (
+            f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+            f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}"
+        )
+        
mock_client.return_value.get_workflow_invocation.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAFORM_STRING.format("DataformHook.get_dataform_client"))
+    def test_cancel_workflow_invocation(self, mock_client):
+        self.hook.cancel_workflow_invocation(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+        )
+        name = (
+            f"projects/{PROJECT_ID}/locations/{REGION}/repositories/"
+            f"{REPOSITORY_ID}/workflowInvocations/{WORKFLOW_INVOCATION_ID}"
+        )
+        
mock_client.return_value.cancel_workflow_invocation.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataform.py 
b/tests/providers/google/cloud/operators/test_dataform.py
new file mode 100644
index 0000000000..e3d4c1c22c
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataform.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import TestCase, mock
+
+from google.api_core.gapic_v1.method import DEFAULT
+
+from airflow.providers.google.cloud.operators.dataform import (
+    DataformCancelWorkflowInvocationOperator,
+    DataformCreateCompilationResultOperator,
+    DataformCreateWorkflowInvocationOperator,
+    DataformGetCompilationResultOperator,
+    DataformGetWorkflowInvocationOperator,
+)
+
+HOOK_STR = "airflow.providers.google.cloud.operators.dataform.DataformHook"
+WORKFLOW_INVOCATION_STR = 
"airflow.providers.google.cloud.operators.dataform.WorkflowInvocation"
+COMPILATION_RESULT_STR = 
"airflow.providers.google.cloud.operators.dataform.CompilationResult"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+REPOSITORY_ID = "test_repository_id"
+WORKSPACE_ID = "test_workspace_id"
+WORKSPACE = 
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/workspaces/{WORKSPACE_ID}"
+COMPILATION_RESULT_ID = "test_compilation_result_id"
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+WORKFLOW_INVOCATION = {
+    "compilation_result": (
+        
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
+        f"compilationResults/{COMPILATION_RESULT_ID}"
+    )
+}
+WORKFLOW_INVOCATION_ID = "test_workflow_invocation"
+
+
+class TestDataformCreateCompilationResult(unittest.TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(COMPILATION_RESULT_STR)
+    def test_execute(self, compilation_result_mock, hook_mock):
+        compilation_result = {
+            "git_commitish": "main",
+            "workspace": WORKSPACE,
+        }
+        op = DataformCreateCompilationResultOperator(
+            task_id="create_compilation_result",
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            compilation_result=compilation_result,
+        )
+        compilation_result_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        
hook_mock.return_value.create_compilation_result.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            compilation_result=compilation_result,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataformGetCompilationResultOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(COMPILATION_RESULT_STR)
+    def test_execute(self, compilation_result_mock, hook_mock):
+        op = DataformGetCompilationResultOperator(
+            task_id="get_compilation_result",
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            compilation_result_id=COMPILATION_RESULT_ID,
+        )
+        compilation_result_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.return_value.get_compilation_result.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            compilation_result_id=COMPILATION_RESULT_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataformCreateWorkflowInvocationOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(WORKFLOW_INVOCATION_STR)
+    def test_execute(self, workflow_invocation_str, hook_mock):
+        op = DataformCreateWorkflowInvocationOperator(
+            task_id="create_workflow_invocation",
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation=WORKFLOW_INVOCATION,
+        )
+        workflow_invocation_str.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+
+        
hook_mock.return_value.create_workflow_invocation.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation=WORKFLOW_INVOCATION,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataformGetWorkflowInvocationOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(WORKFLOW_INVOCATION_STR)
+    def test_execute(self, workflow_invocation_str, hook_mock):
+        op = DataformGetWorkflowInvocationOperator(
+            task_id="get_workflow_invocation",
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+        )
+
+        workflow_invocation_str.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+
+        hook_mock.return_value.get_workflow_invocation.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataformCancelWorkflowInvocationOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataformCancelWorkflowInvocationOperator(
+            task_id="cancel_workflow_invocation",
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+        )
+        op.execute(context=mock.MagicMock())
+        
hook_mock.return_value.cancel_workflow_invocation.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            repository_id=REPOSITORY_ID,
+            workflow_invocation_id=WORKFLOW_INVOCATION_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/system/providers/google/cloud/dataform/__init__.py 
b/tests/system/providers/google/cloud/dataform/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataform/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/dataform/example_dataform.py 
b/tests/system/providers/google/cloud/dataform/example_dataform.py
new file mode 100644
index 0000000000..d07d42898e
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataform/example_dataform.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataform service
+"""
+import os
+from datetime import datetime
+
+from google.cloud.dataform_v1beta1 import WorkflowInvocation
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataform import (
+    DataformCancelWorkflowInvocationOperator,
+    DataformCreateCompilationResultOperator,
+    DataformCreateWorkflowInvocationOperator,
+    DataformGetCompilationResultOperator,
+    DataformGetWorkflowInvocationOperator,
+)
+from airflow.providers.google.cloud.sensors.dataform import 
DataformWorkflowInvocationStateSensor
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "test-project-id")
+
+DAG_ID = "dataform"
+
+REPOSITORY_ID = "dataform-test2"
+REGION = "us-central1"
+WORKSPACE_ID = "testing"
+
+# This DAG is not self-run we need to do some extra configuration to execute 
it in automation process
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',  # Override to match your needs
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example', 'dataform'],
+) as dag:
+    # [START howto_operator_create_compilation_result]
+    create_compilation_result = DataformCreateCompilationResultOperator(
+        task_id="create_compilation_result",
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        compilation_result={
+            "git_commitish": "main",
+            "workspace": (
+                
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
+                f"workspaces/{WORKSPACE_ID}"
+            ),
+        },
+    )
+    # [END howto_operator_create_compilation_result]
+
+    # [START howto_operator_get_compilation_result]
+    get_compilation_result = DataformGetCompilationResultOperator(
+        task_id="get_compilation_result",
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        compilation_result_id=(
+            "{{ 
task_instance.xcom_pull('create_compilation_result')['name'].split('/')[-1] }}"
+        ),
+    )
+    # [END howto_operator_get_compilation_result]]
+
+    # [START howto_operator_create_workflow_invocation]
+    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
+        task_id='create_workflow_invocation',
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        workflow_invocation={
+            "compilation_result": "{{ 
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+        },
+    )
+    # [END howto_operator_create_workflow_invocation]
+
+    # [START howto_operator_create_workflow_invocation_async]
+    create_workflow_invocation_async = 
DataformCreateWorkflowInvocationOperator(
+        task_id='create_workflow_invocation_async',
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        asynchronous=True,
+        workflow_invocation={
+            "compilation_result": "{{ 
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+        },
+    )
+
+    is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
+        task_id="is_workflow_invocation_done",
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        workflow_invocation_id=(
+            "{{ 
task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
+        ),
+        expected_statuses={WorkflowInvocation.State.SUCCEEDED},
+    )
+    # [END howto_operator_create_workflow_invocation_async]
+
+    # [START howto_operator_get_workflow_invocation]
+    get_workflow_invocation = DataformGetWorkflowInvocationOperator(
+        task_id='get_workflow_invocation',
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        workflow_invocation_id=(
+            "{{ 
task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
+        ),
+    )
+    # [END howto_operator_get_workflow_invocation]
+
+    create_second_workflow_invocation = 
DataformCreateWorkflowInvocationOperator(
+        task_id='create_second_workflow_invocation',
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        workflow_invocation={
+            "compilation_result": "{{ 
task_instance.xcom_pull('create_compilation_result')['name'] }}"
+        },
+    )
+
+    # [START howto_operator_cancel_workflow_invocation]
+    cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
+        task_id='cancel_workflow_incoation',
+        project_id=PROJECT_ID,
+        region=REGION,
+        repository_id=REPOSITORY_ID,
+        workflow_invocation_id=(
+            "{{ 
task_instance.xcom_pull('create_second_workflow_invocation')['name'].split('/')[-1]
 }}"
+        ),
+    )
+    # [END howto_operator_cancel_workflow_invocation]
+
+    chain(
+        create_compilation_result,
+        get_compilation_result,
+        create_workflow_invocation >> get_workflow_invocation,
+        create_workflow_invocation_async >> is_workflow_invocation_done,
+        create_second_workflow_invocation >> cancel_workflow_invocation,
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to