This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e926bb9bb0 Add deferrable mode to DataFusionStartPipelineOperator 
(#28690)
e926bb9bb0 is described below

commit e926bb9bb0ce97b4bb32e49279bde237ba8935ed
Author: VladaZakharova <[email protected]>
AuthorDate: Fri Jan 20 21:57:19 2023 +0100

    Add deferrable mode to DataFusionStartPipelineOperator (#28690)
---
 airflow/providers/google/cloud/hooks/datafusion.py | 105 +++++++++-
 airflow/providers/google/cloud/links/datafusion.py | 130 ++++++++++++
 .../providers/google/cloud/operators/datafusion.py | 228 +++++++++++----------
 .../providers/google/cloud/sensors/datafusion.py   |   4 +-
 .../providers/google/cloud/triggers/datafusion.py  | 131 ++++++++++++
 airflow/providers/google/provider.yaml             |   6 +-
 .../operators/cloud/datafusion.rst                 |  37 ++--
 docs/spelling_wordlist.txt                         |   3 +
 .../google/cloud/hooks/test_datafusion.py          | 158 +++++++++++++-
 .../google/cloud/operators/test_datafusion.py      | 159 ++++++++++++--
 .../cloud/operators/test_datafusion_system.py      |  40 ----
 .../google/cloud/sensors/test_datafusion.py        |  27 ++-
 .../google/cloud/triggers/test_datafusion.py       | 145 +++++++++++++
 .../providers/google/cloud/datafusion/__init__.py  |  16 ++
 .../google/cloud/datafusion}/example_datafusion.py |  58 ++++--
 .../cloud/datafusion/example_datafusion_async.py   | 100 ++++-----
 16 files changed, 1085 insertions(+), 262 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/datafusion.py 
b/airflow/providers/google/cloud/hooks/datafusion.py
index 89861a0c4f..fa315af576 100644
--- a/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/airflow/providers/google/cloud/hooks/datafusion.py
@@ -21,14 +21,20 @@ import json
 import os
 from time import monotonic, sleep
 from typing import Any, Dict, Sequence
-from urllib.parse import quote, urlencode
+from urllib.parse import quote, urlencode, urljoin
 
 import google.auth
+from aiohttp import ClientSession
+from gcloud.aio.auth import AioSession, Token
 from google.api_core.retry import exponential_sleep_generator
 from googleapiclient.discovery import Resource, build
 
-from airflow.exceptions import AirflowException
-from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID, GoogleBaseHook
+from airflow.exceptions import AirflowException, AirflowNotFoundException
+from airflow.providers.google.common.hooks.base_google import (
+    PROVIDE_PROJECT_ID,
+    GoogleBaseAsyncHook,
+    GoogleBaseHook,
+)
 
 Operation = Dict[str, Any]
 
@@ -154,12 +160,14 @@ class DataFusionHook(GoogleBaseHook):
 
     @staticmethod
     def _check_response_status_and_data(response, message: str) -> None:
-        if response.status != 200:
+        if response.status == 404:
+            raise AirflowNotFoundException(message)
+        elif response.status != 200:
             raise AirflowException(message)
         if response.data is None:
             raise AirflowException(
                 "Empty response received. Please, check for possible root "
-                "causes of this behavior either in DAG code or on Cloud 
Datafusion side"
+                "causes of this behavior either in DAG code or on Cloud 
DataFusion side"
             )
 
     def get_conn(self) -> Resource:
@@ -418,7 +426,7 @@ class DataFusionHook(GoogleBaseHook):
         :param pipeline_name: Your pipeline name.
         :param instance_url: Endpoint on which the REST APIs is accessible for 
the instance.
         :param runtime_args: Optional runtime JSON args to be passed to the 
pipeline
-        :param namespace: f your pipeline belongs to a Basic edition instance, 
the namespace ID
+        :param namespace: if your pipeline belongs to a Basic edition 
instance, the namespace ID
             is always default. If your pipeline belongs to an Enterprise 
edition instance, you
             can create a namespace.
         """
@@ -469,3 +477,88 @@ class DataFusionHook(GoogleBaseHook):
         self._check_response_status_and_data(
             response, f"Stopping a pipeline failed with code {response.status}"
         )
+
+
+class DataFusionAsyncHook(GoogleBaseAsyncHook):
+    """Class to get asynchronous hook for DataFusion"""
+
+    sync_hook_class = DataFusionHook
+    scopes = ["https://www.googleapis.com/auth/cloud-platform";]
+
+    @staticmethod
+    def _base_url(instance_url: str, namespace: str) -> str:
+        return urljoin(f"{instance_url}/", 
f"v3/namespaces/{quote(namespace)}/apps/")
+
+    async def _get_link(self, url: str, session):
+        async with Token(scopes=self.scopes) as token:
+            session_aio = AioSession(session)
+            headers = {
+                "Authorization": f"Bearer {await token.get()}",
+            }
+            try:
+                pipeline = await session_aio.get(url=url, headers=headers)
+            except AirflowException:
+                pass  # Because the pipeline may not be visible in system yet
+
+        return pipeline
+
+    async def get_pipeline(
+        self,
+        instance_url: str,
+        namespace: str,
+        pipeline_name: str,
+        pipeline_id: str,
+        session,
+    ):
+        base_url_link = self._base_url(instance_url, namespace)
+        url = urljoin(
+            base_url_link, 
f"{quote(pipeline_name)}/workflows/DataPipelineWorkflow/runs/{quote(pipeline_id)}"
+        )
+        return await self._get_link(url=url, session=session)
+
+    async def get_pipeline_status(
+        self,
+        pipeline_name: str,
+        instance_url: str,
+        pipeline_id: str,
+        namespace: str = "default",
+        success_states: list[str] | None = None,
+    ) -> str:
+        """
+        Gets a Cloud Data Fusion pipeline status asynchronously.
+
+        :param pipeline_name: Your pipeline name.
+        :param instance_url: Endpoint on which the REST APIs is accessible for 
the instance.
+        :param pipeline_id: Unique pipeline ID associated with specific 
pipeline
+        :param namespace: if your pipeline belongs to a Basic edition 
instance, the namespace ID
+            is always default. If your pipeline belongs to an Enterprise 
edition instance, you
+            can create a namespace.
+        :param success_states: If provided the operator will wait for pipeline 
to be in one of
+            the provided states.
+        """
+        success_states = success_states or SUCCESS_STATES
+        async with ClientSession() as session:
+            try:
+                pipeline = await self.get_pipeline(
+                    instance_url=instance_url,
+                    namespace=namespace,
+                    pipeline_name=pipeline_name,
+                    pipeline_id=pipeline_id,
+                    session=session,
+                )
+                self.log.info("Response pipeline: %s", pipeline)
+                pipeline = await pipeline.json(content_type=None)
+                current_pipeline_state = pipeline["status"]
+
+                if current_pipeline_state in success_states:
+                    pipeline_status = "success"
+                elif current_pipeline_state in FAILURE_STATES:
+                    pipeline_status = "failed"
+                else:
+                    pipeline_status = "pending"
+            except OSError:
+                pipeline_status = "pending"
+            except Exception as e:
+                self.log.info("Retrieving pipeline status finished with 
errors...")
+                pipeline_status = str(e)
+            return pipeline_status
diff --git a/airflow/providers/google/cloud/links/datafusion.py 
b/airflow/providers/google/cloud/links/datafusion.py
new file mode 100644
index 0000000000..ae1d9d4ada
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datafusion.py
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Compute Engine links."""
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+
+from airflow.models import BaseOperatorLink, XCom
+
+if TYPE_CHECKING:
+    from airflow.models import BaseOperator
+    from airflow.models.taskinstance import TaskInstanceKey
+    from airflow.utils.context import Context
+
+
+BASE_LINK = "https://console.cloud.google.com/data-fusion";
+DATAFUSION_INSTANCE_LINK = BASE_LINK + 
"/locations/{region}/instances/{instance_name}?project={project_id}"
+DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/default/pipelines"
+DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/default/view/{pipeline_name}"
+
+
+class BaseGoogleLink(BaseOperatorLink):
+    """
+    Override the base logic to prevent adding 
'https://console.cloud.google.com'
+    in front of every link where uri is used
+    """
+
+    name: ClassVar[str]
+    key: ClassVar[str]
+    format_str: ClassVar[str]
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        if self.format_str.startswith("http"):
+            return self.format_str.format(**conf)
+        return self.format_str.format(**conf)
+
+
+class DataFusionInstanceLink(BaseGoogleLink):
+    """Helper class for constructing Data Fusion Instance link"""
+
+    name = "Data Fusion Instance"
+    key = "instance_conf"
+    format_str = DATAFUSION_INSTANCE_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance: BaseOperator,
+        location: str,
+        instance_name: str,
+        project_id: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataFusionInstanceLink.key,
+            value={
+                "region": location,
+                "instance_name": instance_name,
+                "project_id": project_id,
+            },
+        )
+
+
+class DataFusionPipelineLink(BaseGoogleLink):
+    """Helper class for constructing Data Fusion Pipeline link"""
+
+    name = "Data Fusion Pipeline"
+    key = "pipeline_conf"
+    format_str = DATAFUSION_PIPELINE_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance: BaseOperator,
+        uri: str,
+        pipeline_name: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataFusionPipelineLink.key,
+            value={
+                "uri": uri,
+                "pipeline_name": pipeline_name,
+            },
+        )
+
+
+class DataFusionPipelinesLink(BaseGoogleLink):
+    """Helper class for constructing list of Data Fusion Pipelines link"""
+
+    name = "Data Fusion Pipelines List"
+    key = "pipelines_conf"
+    format_str = DATAFUSION_PIPELINES_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance: BaseOperator,
+        uri: str,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataFusionPipelinesLink.key,
+            value={
+                "uri": uri,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/datafusion.py 
b/airflow/providers/google/cloud/operators/datafusion.py
index f17255a42a..971909ce67 100644
--- a/airflow/providers/google/cloud/operators/datafusion.py
+++ b/airflow/providers/google/cloud/operators/datafusion.py
@@ -23,20 +23,20 @@ from typing import TYPE_CHECKING, Any, Sequence
 from google.api_core.retry import exponential_sleep_generator
 from googleapiclient.errors import HttpError
 
+from airflow import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.datafusion import SUCCESS_STATES, 
DataFusionHook, PipelineStates
-from airflow.providers.google.cloud.links.base import BaseGoogleLink
+from airflow.providers.google.cloud.links.datafusion import (
+    DataFusionInstanceLink,
+    DataFusionPipelineLink,
+    DataFusionPipelinesLink,
+)
+from airflow.providers.google.cloud.triggers.datafusion import 
DataFusionStartPipelineTrigger
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-BASE_LINK = "https://console.cloud.google.com/data-fusion";
-DATAFUSION_INSTANCE_LINK = BASE_LINK + 
"/locations/{region}/instances/{instance_name}?project={project_id}"
-DATAFUSION_PIPELINES_LINK = "{uri}/cdap/ns/default/pipelines"
-DATAFUSION_PIPELINE_LINK = "{uri}/pipelines/ns/default/view/{pipeline_name}"
-
-
 class DataFusionPipelineLinkHelper:
     """Helper class for Pipeline links"""
 
@@ -47,84 +47,6 @@ class DataFusionPipelineLinkHelper:
         return project_id
 
 
-class DataFusionInstanceLink(BaseGoogleLink):
-    """Helper class for constructing Data Fusion Instance link"""
-
-    name = "Data Fusion Instance"
-    key = "instance_conf"
-    format_str = DATAFUSION_INSTANCE_LINK
-
-    @staticmethod
-    def persist(
-        context: Context,
-        task_instance: (
-            CloudDataFusionRestartInstanceOperator
-            | CloudDataFusionCreateInstanceOperator
-            | CloudDataFusionUpdateInstanceOperator
-            | CloudDataFusionGetInstanceOperator
-        ),
-        project_id: str,
-    ):
-        task_instance.xcom_push(
-            context=context,
-            key=DataFusionInstanceLink.key,
-            value={
-                "region": task_instance.location,
-                "instance_name": task_instance.instance_name,
-                "project_id": project_id,
-            },
-        )
-
-
-class DataFusionPipelineLink(BaseGoogleLink):
-    """Helper class for constructing Data Fusion Pipeline link"""
-
-    name = "Data Fusion Pipeline"
-    key = "pipeline_conf"
-    format_str = DATAFUSION_PIPELINE_LINK
-
-    @staticmethod
-    def persist(
-        context: Context,
-        task_instance: (
-            CloudDataFusionCreatePipelineOperator
-            | CloudDataFusionStartPipelineOperator
-            | CloudDataFusionStopPipelineOperator
-        ),
-        uri: str,
-    ):
-        task_instance.xcom_push(
-            context=context,
-            key=DataFusionPipelineLink.key,
-            value={
-                "uri": uri,
-                "pipeline_name": task_instance.pipeline_name,
-            },
-        )
-
-
-class DataFusionPipelinesLink(BaseGoogleLink):
-    """Helper class for constructing list of Data Fusion Pipelines link"""
-
-    name = "Data Fusion Pipelines"
-    key = "pipelines_conf"
-    format_str = DATAFUSION_PIPELINES_LINK
-
-    @staticmethod
-    def persist(
-        context: Context,
-        task_instance: CloudDataFusionListPipelinesOperator,
-        uri: str,
-    ):
-        task_instance.xcom_push(
-            context=context,
-            key=DataFusionPipelinesLink.key,
-            value={
-                "uri": uri,
-            },
-        )
-
-
 class CloudDataFusionRestartInstanceOperator(BaseOperator):
     """
     Restart a single Data Fusion instance.
@@ -196,7 +118,13 @@ class CloudDataFusionRestartInstanceOperator(BaseOperator):
         self.log.info("Instance %s restarted successfully", self.instance_name)
 
         project_id = self.project_id or 
DataFusionPipelineLinkHelper.get_project_id(instance)
-        DataFusionInstanceLink.persist(context=context, task_instance=self, 
project_id=project_id)
+        DataFusionInstanceLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=project_id,
+            instance_name=self.instance_name,
+            location=self.location,
+        )
 
 
 class CloudDataFusionDeleteInstanceOperator(BaseOperator):
@@ -360,7 +288,13 @@ class CloudDataFusionCreateInstanceOperator(BaseOperator):
                 )
 
         project_id = self.project_id or 
DataFusionPipelineLinkHelper.get_project_id(instance)
-        DataFusionInstanceLink.persist(context=context, task_instance=self, 
project_id=project_id)
+        DataFusionInstanceLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=project_id,
+            instance_name=self.instance_name,
+            location=self.location,
+        )
         return instance
 
 
@@ -449,7 +383,13 @@ class CloudDataFusionUpdateInstanceOperator(BaseOperator):
         self.log.info("Instance %s updated successfully", self.instance_name)
 
         project_id = self.project_id or 
DataFusionPipelineLinkHelper.get_project_id(instance)
-        DataFusionInstanceLink.persist(context=context, task_instance=self, 
project_id=project_id)
+        DataFusionInstanceLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=project_id,
+            instance_name=self.instance_name,
+            location=self.location,
+        )
 
 
 class CloudDataFusionGetInstanceOperator(BaseOperator):
@@ -520,7 +460,13 @@ class CloudDataFusionGetInstanceOperator(BaseOperator):
         )
 
         project_id = self.project_id or 
DataFusionPipelineLinkHelper.get_project_id(instance)
-        DataFusionInstanceLink.persist(context=context, task_instance=self, 
project_id=project_id)
+        DataFusionInstanceLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=project_id,
+            instance_name=self.instance_name,
+            location=self.location,
+        )
         return instance
 
 
@@ -555,12 +501,13 @@ class CloudDataFusionCreatePipelineOperator(BaseOperator):
         account from the list granting this role to the originating account 
(templated).
     """
 
+    operator_extra_links = (DataFusionPipelineLink(),)
+
     template_fields: Sequence[str] = (
         "instance_name",
         "pipeline_name",
         "impersonation_chain",
     )
-    operator_extra_links = (DataFusionPipelineLink(),)
 
     def __init__(
         self,
@@ -609,9 +556,13 @@ class CloudDataFusionCreatePipelineOperator(BaseOperator):
             instance_url=api_url,
             namespace=self.namespace,
         )
-
-        DataFusionPipelineLink.persist(context=context, task_instance=self, 
uri=instance["serviceEndpoint"])
-        self.log.info("Pipeline created")
+        DataFusionPipelineLink.persist(
+            context=context,
+            task_instance=self,
+            uri=instance["serviceEndpoint"],
+            pipeline_name=self.pipeline_name,
+        )
+        self.log.info("Pipeline %s created", self.pipeline_name)
 
 
 class CloudDataFusionDeletePipelineOperator(BaseOperator):
@@ -781,15 +732,16 @@ class CloudDataFusionListPipelinesOperator(BaseOperator):
             project_id=self.project_id,
         )
         api_url = instance["apiEndpoint"]
+        service_endpoint = instance["serviceEndpoint"]
         pipelines = hook.list_pipelines(
             instance_url=api_url,
             namespace=self.namespace,
             artifact_version=self.artifact_version,
             artifact_name=self.artifact_name,
         )
-        self.log.info("%s", pipelines)
+        self.log.info("Pipelines: %s", pipelines)
 
-        DataFusionPipelinesLink.persist(context=context, task_instance=self, 
uri=instance["serviceEndpoint"])
+        DataFusionPipelinesLink.persist(context=context, task_instance=self, 
uri=service_endpoint)
         return pipelines
 
 
@@ -825,9 +777,14 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
-    :param asynchronous: Flag to return after submitting the pipeline Id to 
the Data Fusion API.
-        This is useful for submitting long running pipelines and
+    :param asynchronous: Flag to return after submitting the pipeline ID to 
the Data Fusion API.
+        This is useful for submitting long-running pipelines and
         waiting on them asynchronously using the 
CloudDataFusionPipelineStateSensor
+    :param deferrable: Run operator in the deferrable mode. Is not related to 
asynchronous parameter. While
+        asynchronous parameter gives a possibility to wait until pipeline 
reaches terminate state using
+        sleep() method, deferrable mode checks for the state using 
asynchronous calls. It is not possible to
+        use both asynchronous and deferrable parameters at the same time.
+    :param poll_interval: Polling period in seconds to check for the status. 
Used only in deferrable mode.
     """
 
     template_fields: Sequence[str] = (
@@ -854,6 +811,8 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
         asynchronous=False,
+        deferrable=False,
+        poll_interval=3.0,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -869,6 +828,8 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self.asynchronous = asynchronous
         self.pipeline_timeout = pipeline_timeout
+        self.deferrable = deferrable
+        self.poll_interval = poll_interval
 
         if success_states:
             self.success_states = success_states
@@ -897,21 +858,63 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
         )
         self.log.info("Pipeline %s submitted successfully.", pipeline_id)
 
-        DataFusionPipelineLink.persist(context=context, task_instance=self, 
uri=instance["serviceEndpoint"])
-
-        if not self.asynchronous:
-            self.log.info("Waiting when pipeline %s will be in one of the 
success states", pipeline_id)
-            hook.wait_for_pipeline_state(
-                success_states=self.success_states,
-                pipeline_id=pipeline_id,
-                pipeline_name=self.pipeline_name,
-                namespace=self.namespace,
-                instance_url=api_url,
-                timeout=self.pipeline_timeout,
+        DataFusionPipelineLink.persist(
+            context=context,
+            task_instance=self,
+            uri=instance["serviceEndpoint"],
+            pipeline_name=self.pipeline_name,
+        )
+
+        if self.deferrable:
+            if self.asynchronous:
+                raise AirflowException(
+                    "Both asynchronous and deferrable parameters were passed. 
Please, provide only one."
+                )
+            self.defer(
+                trigger=DataFusionStartPipelineTrigger(
+                    success_states=self.success_states,
+                    instance_url=api_url,
+                    namespace=self.namespace,
+                    pipeline_name=self.pipeline_name,
+                    pipeline_id=pipeline_id,
+                    poll_interval=self.poll_interval,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    delegate_to=self.delegate_to,
+                ),
+                method_name="execute_complete",
             )
-            self.log.info("Job %s discover success state.", pipeline_id)
+        else:
+            if not self.asynchronous:
+                # when NOT using asynchronous mode it will just wait for 
pipeline to finish and print message
+                self.log.info("Waiting when pipeline %s will be in one of the 
success states", pipeline_id)
+                hook.wait_for_pipeline_state(
+                    success_states=self.success_states,
+                    pipeline_id=pipeline_id,
+                    pipeline_name=self.pipeline_name,
+                    namespace=self.namespace,
+                    instance_url=api_url,
+                    timeout=self.pipeline_timeout,
+                )
+                self.log.info("Pipeline %s discovered success state.", 
pipeline_id)
+            #  otherwise, return pipeline_id so that sensor can use it later 
to check the pipeline state
         return pipeline_id
 
+    def execute_complete(self, context: Context, event: dict[str, Any]):
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event["status"] == "error":
+            raise AirflowException(event["message"])
+        self.log.info(
+            "%s completed with response %s ",
+            self.task_id,
+            event["message"],
+        )
+        return event["pipeline_id"]
+
 
 class CloudDataFusionStopPipelineOperator(BaseOperator):
     """
@@ -989,7 +992,12 @@ class CloudDataFusionStopPipelineOperator(BaseOperator):
         )
         api_url = instance["apiEndpoint"]
 
-        DataFusionPipelineLink.persist(context=context, task_instance=self, 
uri=instance["serviceEndpoint"])
+        DataFusionPipelineLink.persist(
+            context=context,
+            task_instance=self,
+            uri=instance["serviceEndpoint"],
+            pipeline_name=self.pipeline_name,
+        )
         hook.stop_pipeline(
             pipeline_name=self.pipeline_name,
             instance_url=api_url,
diff --git a/airflow/providers/google/cloud/sensors/datafusion.py 
b/airflow/providers/google/cloud/sensors/datafusion.py
index f15f423a4a..edf4515381 100644
--- a/airflow/providers/google/cloud/sensors/datafusion.py
+++ b/airflow/providers/google/cloud/sensors/datafusion.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING, Iterable, Sequence
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
 from airflow.sensors.base import BaseSensorOperator
 
@@ -114,6 +114,8 @@ class 
CloudDataFusionPipelineStateSensor(BaseSensorOperator):
                 namespace=self.namespace,
             )
             pipeline_status = pipeline_workflow["status"]
+        except AirflowNotFoundException:
+            raise AirflowException("Specified Pipeline ID was not found.")
         except AirflowException:
             pass  # Because the pipeline may not be visible in system yet
         if pipeline_status is not None:
diff --git a/airflow/providers/google/cloud/triggers/datafusion.py 
b/airflow/providers/google/cloud/triggers/datafusion.py
new file mode 100644
index 0000000000..58d98faa2d
--- /dev/null
+++ b/airflow/providers/google/cloud/triggers/datafusion.py
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator, Sequence
+
+from airflow.providers.google.cloud.hooks.datafusion import DataFusionAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class DataFusionStartPipelineTrigger(BaseTrigger):
+    """
+    Trigger to perform checking the pipeline status until it reaches terminate 
state.
+
+    :param pipeline_name: Your pipeline name.
+    :param instance_url: Endpoint on which the REST APIs is accessible for the 
instance.
+    :param pipeline_id: Unique pipeline ID associated with specific pipeline
+    :param namespace: if your pipeline belongs to a Basic edition instance, 
the namespace ID
+       is always default. If your pipeline belongs to an Enterprise edition 
instance, you
+       can create a namespace.
+    :param gcp_conn_id: Reference to google cloud connection id
+    :param poll_interval: polling period in seconds to check for the status
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    def __init__(
+        self,
+        instance_url: str,
+        namespace: str,
+        pipeline_name: str,
+        pipeline_id: str,
+        poll_interval: float = 3.0,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        success_states: list[str] | None = None,
+    ):
+        super().__init__()
+        self.instance_url = instance_url
+        self.namespace = namespace
+        self.pipeline_name = pipeline_name
+        self.pipeline_id = pipeline_id
+        self.poll_interval = poll_interval
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.delegate_to = delegate_to
+        self.success_states = success_states
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes DataFusionStartPipelineTrigger arguments and 
classpath."""
+        return (
+            
"airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger",
+            {
+                "gcp_conn_id": self.gcp_conn_id,
+                "instance_url": self.instance_url,
+                "namespace": self.namespace,
+                "pipeline_name": self.pipeline_name,
+                "pipeline_id": self.pipeline_id,
+                "success_states": self.success_states,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: 
ignore[override]
+        """Gets current pipeline status and yields a TriggerEvent"""
+        hook = self._get_async_hook()
+        while True:
+            try:
+                # Poll for job execution status
+                response_from_hook = await hook.get_pipeline_status(
+                    success_states=self.success_states,
+                    instance_url=self.instance_url,
+                    namespace=self.namespace,
+                    pipeline_name=self.pipeline_name,
+                    pipeline_id=self.pipeline_id,
+                )
+                if response_from_hook == "success":
+                    yield TriggerEvent(
+                        {
+                            "pipeline_id": self.pipeline_id,
+                            "status": "success",
+                            "message": "Pipeline is running",
+                        }
+                    )
+                    return
+                elif response_from_hook == "pending":
+                    self.log.info("Pipeline is not still in running state...")
+                    self.log.info("Sleeping for %s seconds.", 
self.poll_interval)
+                    await asyncio.sleep(self.poll_interval)
+                else:
+                    yield TriggerEvent({"status": "error", "message": 
response_from_hook})
+                    return
+
+            except Exception as e:
+                self.log.exception("Exception occurred while checking for 
pipeline state")
+                yield TriggerEvent({"status": "error", "message": str(e)})
+
+    def _get_async_hook(self) -> DataFusionAsyncHook:
+        return DataFusionAsyncHook(
+            instance_url=self.instance_url,
+            namespace=self.namespace,
+            pipeline_name=self.pipeline_name,
+            pipeline_id=self.pipeline_id,
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index 8efa9ab5e5..43c1f53e44 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -970,9 +970,9 @@ extra-links:
   - airflow.providers.google.cloud.links.dataform.DataformRepositoryLink
   - airflow.providers.google.cloud.links.dataform.DataformWorkspaceLink
   - 
airflow.providers.google.cloud.links.dataform.DataformWorkflowInvocationLink
-  - airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
-  - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
-  - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
+  - airflow.providers.google.cloud.links.datafusion.DataFusionInstanceLink
+  - airflow.providers.google.cloud.links.datafusion.DataFusionPipelineLink
+  - airflow.providers.google.cloud.links.datafusion.DataFusionPipelinesLink
   - airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceLink
   - airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst 
b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
index 70c25afe0e..e4778a758b 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
@@ -40,7 +40,7 @@ Restart DataFusion Instance
 To restart Data Fusion instance use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionRestartInstanceOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_restart_instance_operator]
@@ -59,7 +59,7 @@ Delete DataFusion Instance
 To delete Data Fusion instance use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionDeleteInstanceOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_delete_instance_operator]
@@ -79,7 +79,7 @@ Create DataFusion Instance
 To create Data Fusion instance use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionCreateInstanceOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_create_instance_operator]
@@ -99,7 +99,7 @@ Update DataFusion Instance
 To update Data Fusion instance use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionUpdateInstanceOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_update_instance_operator]
@@ -118,7 +118,7 @@ Get DataFusion Instance
 To retrieve Data Fusion instance use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionGetInstanceOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_get_instance_operator]
@@ -138,7 +138,7 @@ Create a DataFusion pipeline
 To create Data Fusion pipeline use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionCreatePipelineOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_create_pipeline]
@@ -157,7 +157,7 @@ Start a DataFusion pipeline
 To start Data Fusion pipeline using synchronous mode:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_start_pipeline]
@@ -166,12 +166,25 @@ To start Data Fusion pipeline using synchronous mode:
 To start Data Fusion pipeline using asynchronous mode:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_start_pipeline_async]
     :end-before: [END howto_cloud_data_fusion_start_pipeline_async]
 
+There is a possibility to start Data Fusion pipeline asynchronously using 
deferrable mode. While asynchronous
+parameter gives a possibility to wait until DataFusion pipeline reaches 
terminate state using synchronous
+sleep() method, deferrable mode checks for the state using asynchronous calls.
+It is not possible to use both asynchronous and deferrable parameters at the 
same time.
+Please, check the example of using deferrable mode:
+:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`.
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_cloud_data_fusion_start_pipeline_def]
+    :end-before: [END howto_cloud_data_fusion_start_pipeline_def]
+
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 
:template-fields:`airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStartPipelineOperator`
 parameters which allows you to dynamically determine values.
@@ -185,7 +198,7 @@ Stop a DataFusion pipeline
 To stop Data Fusion pipeline use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionStopPipelineOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_stop_pipeline]
@@ -204,7 +217,7 @@ Delete a DataFusion pipeline
 To delete Data Fusion pipeline use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionDeletePipelineOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_delete_pipeline]
@@ -224,7 +237,7 @@ List DataFusion pipelines
 To list Data Fusion pipelines use:
 
:class:`~airflow.providers.google.cloud.operators.datafusion.CloudDataFusionListPipelinesOperator`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_list_pipelines]
@@ -242,7 +255,7 @@ When start pipeline is triggered asynchronously sensors may 
be used to run check
 
 
:class:`~airflow.providers.google.cloud.sensors.datafusion.CloudDataFusionPipelineStateSensor`.
 
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_datafusion.py
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/datafusion/example_datafusion.py
     :language: python
     :dedent: 4
     :start-after: [START howto_cloud_data_fusion_start_pipeline_sensor]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 28a34b76dc..397ab5e699 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -347,6 +347,9 @@ DataFrame
 Dataframe
 dataframe
 dataframes
+DataFusion
+Datafusion
+datafusion
 Datalake
 Datanodes
 datapipe
diff --git a/tests/providers/google/cloud/hooks/test_datafusion.py 
b/tests/providers/google/cloud/hooks/test_datafusion.py
index e5fe109824..1b2e3ccc57 100644
--- a/tests/providers/google/cloud/hooks/test_datafusion.py
+++ b/tests/providers/google/cloud/hooks/test_datafusion.py
@@ -17,26 +17,44 @@
 from __future__ import annotations
 
 import json
-from unittest import mock
+import sys
 
+import aiohttp
 import pytest
+from aiohttp.helpers import TimerNoop
+from yarl import URL
 
 from airflow import AirflowException
-from airflow.providers.google.cloud.hooks.datafusion import DataFusionHook
+from airflow.providers.google.cloud.hooks.datafusion import 
DataFusionAsyncHook, DataFusionHook
 from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
 
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
 API_VERSION = "v1beta1"
 GCP_CONN_ID = "google_cloud_default"
 HOOK_STR = "airflow.providers.google.cloud.hooks.datafusion.{}"
 LOCATION = "test-location"
 INSTANCE_NAME = "airflow-test-instance"
+INSTANCE_URL = "http://datafusion.instance.com";
 INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
+NAMESPACE = "default"
 PROJECT_ID = "test_project_id"
 PIPELINE_NAME = "shrubberyPipeline"
 PIPELINE_ID = "123"
 PIPELINE = {"test": "pipeline"}
-INSTANCE_URL = "http://datafusion.instance.com";
 RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
+CONSTRUCTED_PIPELINE_URL = (
+    f"{INSTANCE_URL}/v3/namespaces/{NAMESPACE}/apps/{PIPELINE_NAME}"
+    f"/workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}"
+)
+CONSTRUCTED_PIPELINE_URL_GET = (
+    f"https://{INSTANCE_NAME}-{PROJECT_ID}-dot-eun1.datafusion.";
+    f"googleusercontent.com/api/v3/namespaces/{NAMESPACE}/apps/{PIPELINE_NAME}"
+    f"/workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}"
+)
 
 
 @pytest.fixture
@@ -48,6 +66,19 @@ def hook():
         yield DataFusionHook(gcp_conn_id=GCP_CONN_ID)
 
 
[email protected]
+def hook_async():
+    with mock.patch(
+        
"airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook.__init__",
+        new=mock_base_gcp_hook_default_project_id,
+    ):
+        yield DataFusionAsyncHook()
+
+
+def session():
+    return mock.Mock()
+
+
 class TestDataFusionHook:
     @staticmethod
     def mock_endpoint(get_conn_mock):
@@ -168,7 +199,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.create_pipeline(pipeline_name=PIPELINE_NAME, 
pipeline=PIPELINE, instance_url=INSTANCE_URL)
         mock_request.assert_called_once_with(
@@ -205,7 +236,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.delete_pipeline(pipeline_name=PIPELINE_NAME, 
instance_url=INSTANCE_URL)
         mock_request.assert_called_once_with(
@@ -243,7 +274,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.list_pipelines(instance_url=INSTANCE_URL)
         mock_request.assert_called_once_with(
@@ -284,7 +315,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.start_pipeline(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
@@ -337,7 +368,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.stop_pipeline(pipeline_name=PIPELINE_NAME, 
instance_url=INSTANCE_URL)
         mock_request.assert_called_once_with(
@@ -377,7 +408,7 @@ class TestDataFusionHook:
         with pytest.raises(
             AirflowException,
             match=r"Empty response received. Please, check for possible root 
causes "
-            r"of this behavior either in DAG code or on Cloud Datafusion side",
+            r"of this behavior either in DAG code or on Cloud DataFusion side",
         ):
             hook.get_pipeline_workflow(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
pipeline_id=PIPELINE_ID
@@ -400,3 +431,112 @@ class TestDataFusionHook:
             f"workflows/DataPipelineWorkflow/runs/{PIPELINE_ID}",
             method="GET",
         )
+
+
+class TestDataFusionHookAsynch:
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STR.format("DataFusionAsyncHook._get_link"))
+    async def test_async_get_pipeline_should_execute_successfully(self, 
mocked_link, hook_async):
+        await hook_async.get_pipeline(
+            instance_url=INSTANCE_URL,
+            namespace=NAMESPACE,
+            pipeline_name=PIPELINE_NAME,
+            pipeline_id=PIPELINE_ID,
+            session=session,
+        )
+        mocked_link.assert_awaited_once_with(url=CONSTRUCTED_PIPELINE_URL, 
session=session)
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STR.format("DataFusionAsyncHook.get_pipeline"))
+    async def 
test_async_get_pipeline_status_completed_should_execute_successfully(
+        self, mocked_get, hook_async
+    ):
+        response = aiohttp.ClientResponse(
+            "get",
+            URL(CONSTRUCTED_PIPELINE_URL_GET),
+            request_info=mock.Mock(),
+            writer=mock.Mock(),
+            continue100=None,
+            timer=TimerNoop(),
+            traces=[],
+            loop=mock.Mock(),
+            session=session,
+        )
+        response.status = 200
+        mocked_get.return_value = response
+        mocked_get.return_value._headers = {"Authorization": "some-token"}
+        mocked_get.return_value._body = b'{"status": "COMPLETED"}'
+
+        pipeline_status = await hook_async.get_pipeline_status(
+            pipeline_name=PIPELINE_NAME,
+            instance_url=INSTANCE_URL,
+            pipeline_id=PIPELINE_ID,
+            namespace=NAMESPACE,
+        )
+        mocked_get.assert_awaited_once()
+        assert pipeline_status == "success"
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STR.format("DataFusionAsyncHook.get_pipeline"))
+    async def 
test_async_get_pipeline_status_running_should_execute_successfully(
+        self, mocked_get, hook_async
+    ):
+        """Assets that the DataFusionAsyncHook returns pending response when 
job is still in running state"""
+        response = aiohttp.ClientResponse(
+            "get",
+            URL(CONSTRUCTED_PIPELINE_URL_GET),
+            request_info=mock.Mock(),
+            writer=mock.Mock(),
+            continue100=None,
+            timer=TimerNoop(),
+            traces=[],
+            loop=mock.Mock(),
+            session=session,
+        )
+        response.status = 200
+        mocked_get.return_value = response
+        mocked_get.return_value._headers = {"Authorization": "some-token"}
+        mocked_get.return_value._body = b'{"status": "RUNNING"}'
+
+        pipeline_status = await hook_async.get_pipeline_status(
+            pipeline_name=PIPELINE_NAME,
+            instance_url=INSTANCE_URL,
+            pipeline_id=PIPELINE_ID,
+            namespace=NAMESPACE,
+        )
+        mocked_get.assert_awaited_once()
+        assert pipeline_status == "pending"
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STR.format("DataFusionAsyncHook.get_pipeline"))
+    async def 
test_async_get_pipeline_status_os_error_should_execute_successfully(
+        self, mocked_get, hook_async
+    ):
+        """Assets that the DataFusionAsyncHook returns a pending response when 
OSError is raised"""
+        mocked_get.side_effect = OSError()
+
+        pipeline_status = await hook_async.get_pipeline_status(
+            pipeline_name=PIPELINE_NAME,
+            instance_url=INSTANCE_URL,
+            pipeline_id=PIPELINE_ID,
+            namespace=NAMESPACE,
+        )
+        mocked_get.assert_awaited_once()
+        assert pipeline_status == "pending"
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STR.format("DataFusionAsyncHook.get_pipeline"))
+    async def 
test_async_get_pipeline_status_exception_should_execute_successfully(
+        self, mocked_get, hook_async, caplog
+    ):
+        """Assets that the logging is done correctly when DataFusionAsyncHook 
raises Exception"""
+        mocked_get.side_effect = Exception()
+
+        await hook_async.get_pipeline_status(
+            pipeline_name=PIPELINE_NAME,
+            instance_url=INSTANCE_URL,
+            pipeline_id=PIPELINE_ID,
+            namespace=NAMESPACE,
+        )
+        mocked_get.assert_awaited_once()
+        assert "Retrieving pipeline status finished with errors..." in 
caplog.text
diff --git a/tests/providers/google/cloud/operators/test_datafusion.py 
b/tests/providers/google/cloud/operators/test_datafusion.py
index 48278d3ec5..2950fceb7d 100644
--- a/tests/providers/google/cloud/operators/test_datafusion.py
+++ b/tests/providers/google/cloud/operators/test_datafusion.py
@@ -18,7 +18,10 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pytest
+
 from airflow import DAG
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.google.cloud.hooks.datafusion import SUCCESS_STATES, 
PipelineStates
 from airflow.providers.google.cloud.operators.datafusion import (
     CloudDataFusionCreateInstanceOperator,
@@ -32,15 +35,18 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionStopPipelineOperator,
     CloudDataFusionUpdateInstanceOperator,
 )
+from airflow.providers.google.cloud.triggers.datafusion import 
DataFusionStartPipelineTrigger
 
 HOOK_STR = "airflow.providers.google.cloud.operators.datafusion.DataFusionHook"
 
+TASK_ID = "test_task"
 LOCATION = "test-location"
 INSTANCE_NAME = "airflow-test-instance"
 INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
 PROJECT_ID = "test_project_id"
 PIPELINE_NAME = "shrubberyPipeline"
 PIPELINE = {"test": "pipeline"}
+PIPELINE_ID = "test_pipeline_id"
 INSTANCE_URL = "http://datafusion.instance.com";
 NAMESPACE = "TEST_NAMESPACE"
 RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
@@ -48,7 +54,7 @@ RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
 
 class TestCloudDataFusionUpdateInstanceOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         update_maks = "instance.name"
         op = CloudDataFusionUpdateInstanceOperator(
             task_id="test_tasks",
@@ -72,7 +78,7 @@ class TestCloudDataFusionUpdateInstanceOperator:
 
 class TestCloudDataFusionRestartInstanceOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         op = CloudDataFusionRestartInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -89,7 +95,7 @@ class TestCloudDataFusionRestartInstanceOperator:
 
 class TestCloudDataFusionCreateInstanceOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         op = CloudDataFusionCreateInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -110,7 +116,7 @@ class TestCloudDataFusionCreateInstanceOperator:
 
 class TestCloudDataFusionDeleteInstanceOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         op = CloudDataFusionDeleteInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -127,7 +133,7 @@ class TestCloudDataFusionDeleteInstanceOperator:
 
 class TestCloudDataFusionGetInstanceOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         op = CloudDataFusionGetInstanceOperator(
             task_id="test_tasks",
             instance_name=INSTANCE_NAME,
@@ -143,7 +149,7 @@ class TestCloudDataFusionGetInstanceOperator:
 
 class TestCloudDataFusionCreatePipelineOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         mock_hook.return_value.get_instance.return_value = {
             "apiEndpoint": INSTANCE_URL,
             "serviceEndpoint": INSTANCE_URL,
@@ -172,7 +178,7 @@ class TestCloudDataFusionCreatePipelineOperator:
 
 class TestCloudDataFusionDeletePipelineOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         mock_hook.return_value.get_instance.return_value = {
             "apiEndpoint": INSTANCE_URL,
             "serviceEndpoint": INSTANCE_URL,
@@ -201,8 +207,7 @@ class TestCloudDataFusionDeletePipelineOperator:
 
 class TestCloudDataFusionStartPipelineOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
-        PIPELINE_ID = "test_pipeline_id"
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         mock_hook.return_value.get_instance.return_value = {
             "apiEndpoint": INSTANCE_URL,
             "serviceEndpoint": INSTANCE_URL,
@@ -210,7 +215,7 @@ class TestCloudDataFusionStartPipelineOperator:
         mock_hook.return_value.start_pipeline.return_value = PIPELINE_ID
 
         op = CloudDataFusionStartPipelineOperator(
-            task_id="test_task",
+            task_id=TASK_ID,
             pipeline_name=PIPELINE_NAME,
             instance_name=INSTANCE_NAME,
             namespace=NAMESPACE,
@@ -242,8 +247,7 @@ class TestCloudDataFusionStartPipelineOperator:
         )
 
     @mock.patch(HOOK_STR)
-    def test_execute_async(self, mock_hook):
-        PIPELINE_ID = "test_pipeline_id"
+    def 
test_execute_check_hook_call_asynch_param_should_execute_successfully(self, 
mock_hook):
         mock_hook.return_value.get_instance.return_value = {
             "apiEndpoint": INSTANCE_URL,
             "serviceEndpoint": INSTANCE_URL,
@@ -251,7 +255,7 @@ class TestCloudDataFusionStartPipelineOperator:
         mock_hook.return_value.start_pipeline.return_value = PIPELINE_ID
 
         op = CloudDataFusionStartPipelineOperator(
-            task_id="test_task",
+            task_id=TASK_ID,
             pipeline_name=PIPELINE_NAME,
             instance_name=INSTANCE_NAME,
             namespace=NAMESPACE,
@@ -261,12 +265,110 @@ class TestCloudDataFusionStartPipelineOperator:
             asynchronous=True,
         )
         op.dag = mock.MagicMock(spec=DAG, task_dict={}, dag_id="test")
-
         op.execute(context=mock.MagicMock())
+
         mock_hook.return_value.get_instance.assert_called_once_with(
             instance_name=INSTANCE_NAME, location=LOCATION, 
project_id=PROJECT_ID
         )
+        mock_hook.return_value.start_pipeline.assert_called_once_with(
+            instance_url=INSTANCE_URL,
+            pipeline_name=PIPELINE_NAME,
+            namespace=NAMESPACE,
+            runtime_args=RUNTIME_ARGS,
+        )
+        mock_hook.return_value.wait_for_pipeline_state.assert_not_called()
+
+
+class TestCloudDataFusionStartPipelineOperatorAsynch:
+    @mock.patch(HOOK_STR)
+    def test_asynch_execute_should_execute_successfully(self, mock_hook):
+        """
+        Asserts that a task is deferred and a DataFusionStartPipelineTrigger 
will be fired
+        when the CloudDataFusionStartPipelineOperator is executed in 
deferrable mode when deferrable=True.
+        """
+
+        op = CloudDataFusionStartPipelineOperator(
+            task_id=TASK_ID,
+            pipeline_name=PIPELINE_NAME,
+            instance_name=INSTANCE_NAME,
+            namespace=NAMESPACE,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            runtime_args=RUNTIME_ARGS,
+            deferrable=True,
+        )
+        op.dag = mock.MagicMock(spec=DAG, task_dict={}, dag_id="test")
+        with pytest.raises(TaskDeferred) as exc:
+            op.execute(context=mock.MagicMock())
+
+        assert isinstance(
+            exc.value.trigger, DataFusionStartPipelineTrigger
+        ), "Trigger is not a DataFusionStartPipelineTrigger"
+
+    def test_asynch_execute_should_should_throw_exception(self):
+        """Tests that an AirflowException is raised in case of error event"""
+
+        op = CloudDataFusionStartPipelineOperator(
+            task_id=TASK_ID,
+            pipeline_name=PIPELINE_NAME,
+            instance_name=INSTANCE_NAME,
+            namespace=NAMESPACE,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            runtime_args=RUNTIME_ARGS,
+            deferrable=True,
+        )
+        with pytest.raises(AirflowException):
+            op.execute_complete(
+                context=mock.MagicMock(), event={"status": "error", "message": 
"test failure message"}
+            )
 
+    def test_asynch_execute_logging_should_execute_successfully(self):
+        """Asserts that logging occurs as expected"""
+
+        op = CloudDataFusionStartPipelineOperator(
+            task_id=TASK_ID,
+            pipeline_name=PIPELINE_NAME,
+            instance_name=INSTANCE_NAME,
+            namespace=NAMESPACE,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            runtime_args=RUNTIME_ARGS,
+            deferrable=True,
+        )
+        with mock.patch.object(op.log, "info") as mock_log_info:
+            op.execute_complete(
+                context=mock.MagicMock(),
+                event={"status": "success", "message": "Pipeline completed", 
"pipeline_id": PIPELINE_ID},
+            )
+        mock_log_info.assert_called_with("%s completed with response %s ", 
TASK_ID, "Pipeline completed")
+
+    @mock.patch(HOOK_STR)
+    def test_asynch_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
+        mock_hook.return_value.get_instance.return_value = {
+            "apiEndpoint": INSTANCE_URL,
+            "serviceEndpoint": INSTANCE_URL,
+        }
+        mock_hook.return_value.start_pipeline.return_value = PIPELINE_ID
+
+        op = CloudDataFusionStartPipelineOperator(
+            task_id=TASK_ID,
+            pipeline_name=PIPELINE_NAME,
+            instance_name=INSTANCE_NAME,
+            namespace=NAMESPACE,
+            location=LOCATION,
+            project_id=PROJECT_ID,
+            runtime_args=RUNTIME_ARGS,
+            deferrable=True,
+        )
+        op.dag = mock.MagicMock(spec=DAG, task_dict={}, dag_id="test")
+        with pytest.raises(TaskDeferred):
+            result_pipeline_id = op.execute(context=mock.MagicMock())
+            assert result_pipeline_id == PIPELINE_ID
+
+        mock_hook.return_value.get_instance.assert_called_once_with(
+            instance_name=INSTANCE_NAME, location=LOCATION, 
project_id=PROJECT_ID
+        )
         mock_hook.return_value.start_pipeline.assert_called_once_with(
             instance_url=INSTANCE_URL,
             pipeline_name=PIPELINE_NAME,
@@ -274,12 +376,35 @@ class TestCloudDataFusionStartPipelineOperator:
             runtime_args=RUNTIME_ARGS,
         )
 
-        mock_hook.return_value.wait_for_pipeline_state.assert_not_called()
+    @mock.patch(HOOK_STR)
+    def 
test_execute_check_hook_call_asynch_param_should_execute_successfully(self, 
mock_hook):
+        mock_hook.return_value.get_instance.return_value = {
+            "apiEndpoint": INSTANCE_URL,
+            "serviceEndpoint": INSTANCE_URL,
+        }
+        mock_hook.return_value.start_pipeline.return_value = PIPELINE_ID
+        with pytest.raises(
+            AirflowException,
+            match=r"Both asynchronous and deferrable parameters were passed. 
Please, provide only one.",
+        ):
+            op = CloudDataFusionStartPipelineOperator(
+                task_id=TASK_ID,
+                pipeline_name=PIPELINE_NAME,
+                instance_name=INSTANCE_NAME,
+                namespace=NAMESPACE,
+                location=LOCATION,
+                project_id=PROJECT_ID,
+                runtime_args=RUNTIME_ARGS,
+                asynchronous=True,
+                deferrable=True,
+            )
+            op.dag = mock.MagicMock(spec=DAG, task_dict={}, dag_id="test")
+            op.execute(context=mock.MagicMock())
 
 
 class TestCloudDataFusionStopPipelineOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         mock_hook.return_value.get_instance.return_value = {
             "apiEndpoint": INSTANCE_URL,
             "serviceEndpoint": INSTANCE_URL,
@@ -304,7 +429,7 @@ class TestCloudDataFusionStopPipelineOperator:
 
 class TestCloudDataFusionListPipelinesOperator:
     @mock.patch(HOOK_STR)
-    def test_execute(self, mock_hook):
+    def test_execute_check_hook_call_should_execute_successfully(self, 
mock_hook):
         artifact_version = "artifact_version"
         artifact_name = "artifact_name"
         mock_hook.return_value.get_instance.return_value = {
diff --git a/tests/providers/google/cloud/operators/test_datafusion_system.py 
b/tests/providers/google/cloud/operators/test_datafusion_system.py
deleted file mode 100644
index e98ef04cf7..0000000000
--- a/tests/providers/google/cloud/operators/test_datafusion_system.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_datafusion import 
BUCKET_1, BUCKET_2
-from tests.providers.google.cloud.utils.gcp_authenticator import 
GCP_DATAFUSION_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest, provide_gcp_context
-
-
[email protected]("mysql", "postgres")
[email protected]_file(GCP_DATAFUSION_KEY)
-class TestCloudDataFusionExampleDagsSystem(GoogleSystemTest):
-    def setup_method(self) -> None:
-        self.create_gcs_bucket(name=BUCKET_1)
-        self.create_gcs_bucket(name=BUCKET_2)
-
-    def teardown_method(self) -> None:
-        self.delete_gcs_bucket(name=BUCKET_1)
-        self.delete_gcs_bucket(name=BUCKET_2)
-
-    @provide_gcp_context(GCP_DATAFUSION_KEY)
-    def test_run_example_dag_function(self):
-        self.run_dag("example_data_fusion", CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/sensors/test_datafusion.py 
b/tests/providers/google/cloud/sensors/test_datafusion.py
index c290c4ff26..195f020681 100644
--- a/tests/providers/google/cloud/sensors/test_datafusion.py
+++ b/tests/providers/google/cloud/sensors/test_datafusion.py
@@ -23,7 +23,7 @@ from unittest import mock
 import pytest
 from parameterized.parameterized import parameterized
 
-from airflow import AirflowException
+from airflow.exceptions import AirflowException, AirflowNotFoundException
 from airflow.providers.google.cloud.hooks.datafusion import PipelineStates
 from airflow.providers.google.cloud.sensors.datafusion import 
CloudDataFusionPipelineStateSensor
 
@@ -102,3 +102,28 @@ class 
TestCloudDataFusionPipelineStateSensor(unittest.TestCase):
         ):
             mock_hook.return_value.get_pipeline_workflow.return_value = 
{"status": "FAILED"}
             task.poke(mock.MagicMock())
+
+    
@mock.patch("airflow.providers.google.cloud.sensors.datafusion.DataFusionHook")
+    def test_not_found_exception(self, mock_hook):
+        mock_hook.return_value.get_instance.return_value = {"apiEndpoint": 
INSTANCE_URL}
+        mock_hook.return_value.get_pipeline_workflow.side_effect = 
AirflowNotFoundException()
+
+        task = CloudDataFusionPipelineStateSensor(
+            task_id="test_task_id",
+            pipeline_name=PIPELINE_NAME,
+            pipeline_id=PIPELINE_ID,
+            project_id=PROJECT_ID,
+            expected_statuses={PipelineStates.COMPLETED},
+            failure_statuses=FAILURE_STATUSES,
+            instance_name=INSTANCE_NAME,
+            location=LOCATION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        with pytest.raises(
+            AirflowException,
+            match="Specified Pipeline ID was not found.",
+        ):
+            task.poke(mock.MagicMock())
diff --git a/tests/providers/google/cloud/triggers/test_datafusion.py 
b/tests/providers/google/cloud/triggers/test_datafusion.py
new file mode 100644
index 0000000000..907cec84fd
--- /dev/null
+++ b/tests/providers/google/cloud/triggers/test_datafusion.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import logging
+import sys
+
+import pytest
+
+from airflow.providers.google.cloud.triggers.datafusion import 
DataFusionStartPipelineTrigger
+from airflow.triggers.base import TriggerEvent
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+HOOK_STATUS_STR = 
"airflow.providers.google.cloud.hooks.datafusion.DataFusionAsyncHook.get_pipeline_status"
+CLASSPATH = 
"airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger"
+
+TASK_ID = "test_task"
+LOCATION = "test-location"
+INSTANCE_NAME = "airflow-test-instance"
+INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
+PROJECT_ID = "test_project_id"
+PIPELINE_NAME = "shrubberyPipeline"
+PIPELINE = {"test": "pipeline"}
+PIPELINE_ID = "test_pipeline_id"
+INSTANCE_URL = "http://datafusion.instance.com";
+NAMESPACE = "TEST_NAMESPACE"
+RUNTIME_ARGS = {"arg1": "a", "arg2": "b"}
+TEST_POLL_INTERVAL = 4.0
+TEST_GCP_PROJECT_ID = "test-project"
+
+
[email protected]
+def trigger():
+    return DataFusionStartPipelineTrigger(
+        instance_url=INSTANCE_URL,
+        namespace=NAMESPACE,
+        pipeline_name=PIPELINE_NAME,
+        pipeline_id=PIPELINE_ID,
+        poll_interval=TEST_POLL_INTERVAL,
+        gcp_conn_id=TEST_GCP_PROJECT_ID,
+    )
+
+
+class TestDataFusionStartPipelineTrigger:
+    def 
test_start_pipeline_trigger_serialization_should_execute_successfully(self, 
trigger):
+        """
+        Asserts that the DataFusionStartPipelineTrigger correctly serializes 
its arguments
+        and classpath.
+        """
+        classpath, kwargs = trigger.serialize()
+        assert classpath == CLASSPATH
+        assert kwargs == {
+            "instance_url": INSTANCE_URL,
+            "namespace": NAMESPACE,
+            "pipeline_name": PIPELINE_NAME,
+            "pipeline_id": PIPELINE_ID,
+            "gcp_conn_id": TEST_GCP_PROJECT_ID,
+            "success_states": None,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def 
test_start_pipeline_trigger_on_success_should_execute_successfully(
+        self, mock_pipeline_status, trigger
+    ):
+        """
+        Tests the DataFusionStartPipelineTrigger only fires once the job 
execution reaches a successful state.
+        """
+        mock_pipeline_status.return_value = "success"
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert (
+            TriggerEvent({"status": "success", "message": "Pipeline is 
running", "pipeline_id": PIPELINE_ID})
+            == actual
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def test_start_pipeline_trigger_running_should_execute_successfully(
+        self, mock_pipeline_status, trigger, caplog
+    ):
+        """
+        Test that DataFusionStartPipelineTrigger does not fire while a job is 
still running.
+        """
+
+        mock_pipeline_status.return_value = "pending"
+        caplog.set_level(logging.INFO)
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+
+        # TriggerEvent was not returned
+        assert task.done() is False
+
+        assert "Pipeline is not still in running state..." in caplog.text
+        assert f"Sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text
+
+        # Prevents error when task is destroyed while in "pending" state
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def test_start_pipeline_trigger_error_should_execute_successfully(
+        self, mock_pipeline_status, trigger
+    ):
+        """
+        Test that DataFusionStartPipelineTrigger fires the correct event in 
case of an error.
+        """
+        mock_pipeline_status.return_value = "error"
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "error"}) == actual
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def 
test_start_pipeline_trigger_exception_should_execute_successfully(
+        self, mock_pipeline_status, trigger
+    ):
+        """
+        Test that DataFusionStartPipelineTrigger fires the correct event in 
case of an error.
+        """
+        mock_pipeline_status.side_effect = Exception("Test exception")
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
== actual
diff --git a/tests/system/providers/google/cloud/datafusion/__init__.py 
b/tests/system/providers/google/cloud/datafusion/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/datafusion/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
similarity index 88%
copy from airflow/providers/google/cloud/example_dags/example_datafusion.py
copy to tests/system/providers/google/cloud/datafusion/example_datafusion.py
index 24b0a9b239..4ff221c928 100644
--- a/airflow/providers/google/cloud/example_dags/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion.py
@@ -36,10 +36,12 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionStopPipelineOperator,
     CloudDataFusionUpdateInstanceOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator
 from airflow.providers.google.cloud.sensors.datafusion import 
CloudDataFusionPipelineStateSensor
 
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 LOCATION = "europe-north1"
 INSTANCE_NAME = "airflow-test-instance"
 INSTANCE = {
@@ -48,19 +50,17 @@ INSTANCE = {
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", 
"test-datafusion-bucket-1")
-BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", 
"test-datafusion-bucket-2")
-
+BUCKET_1 = "test-datafusion-1"
+BUCKET_2 = "test-datafusion-2"
 BUCKET_1_URI = f"gs://{BUCKET_1}"
 BUCKET_2_URI = f"gs://{BUCKET_2}"
 
-PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
+PIPELINE_NAME = "test-pipe"
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
-        "version": "6.5.1",
+        "version": "6.7.2",
         "scope": "SYSTEM",
-        "label": "Data Pipeline - System Test",
     },
     "description": "Data Pipeline Application",
     "name": "test-pipe",
@@ -80,7 +80,7 @@ PIPELINE = {
                     "name": "GCSFile",
                     "type": "batchsource",
                     "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.18.1", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "format": "text",
@@ -109,7 +109,7 @@ PIPELINE = {
                     "name": "GCS",
                     "type": "batchsink",
                     "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.18.1", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "suffix": "yyyy-MM-dd-HH-mm",
@@ -145,12 +145,23 @@ PIPELINE = {
 }
 # [END howto_data_fusion_env_variables]
 
-
 with models.DAG(
     "example_data_fusion",
     start_date=datetime(2021, 1, 1),
     catchup=False,
 ) as dag:
+    create_bucket1 = GCSCreateBucketOperator(
+        task_id="create_bucket1",
+        bucket_name=BUCKET_1,
+        project_id=PROJECT_ID,
+    )
+
+    create_bucket2 = GCSCreateBucketOperator(
+        task_id="create_bucket2",
+        bucket_name=BUCKET_2,
+        project_id=PROJECT_ID,
+    )
+
     # [START howto_cloud_data_fusion_create_instance_operator]
     create_instance = CloudDataFusionCreateInstanceOperator(
         location=LOCATION,
@@ -215,7 +226,6 @@ with models.DAG(
         asynchronous=True,
         task_id="start_pipeline_async",
     )
-
     # [END howto_cloud_data_fusion_start_pipeline_async]
 
     # [START howto_cloud_data_fusion_start_pipeline_sensor]
@@ -257,19 +267,35 @@ with models.DAG(
     # Add sleep before creating pipeline
     sleep = BashOperator(task_id="sleep", bash_command="sleep 60")
 
-    create_instance >> get_instance >> restart_instance >> update_instance >> 
sleep
+    # Add sleep before creating pipeline
+    sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
+
     (
-        sleep
+        create_bucket1
+        >> create_bucket2
+        >> create_instance
+        >> get_instance
+        >> restart_instance
+        >> update_instance
+        >> sleep
         >> create_pipeline
         >> list_pipelines
         >> start_pipeline_async
         >> start_pipeline_sensor
         >> start_pipeline
         >> stop_pipeline
+        >> sleep_30
         >> delete_pipeline
+        >> delete_instance
     )
-    delete_pipeline >> delete_instance
 
-if __name__ == "__main__":
-    dag.clear()
-    dag.run()
+    from tests.system.utils.watcher import watcher
+
+# This test needs watcher in order to properly mark success/failure
+# when "tearDown" task with trigger rule is part of the DAG
+list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py 
b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
similarity index 83%
rename from airflow/providers/google/cloud/example_dags/example_datafusion.py
rename to 
tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
index 24b0a9b239..f8c345c735 100644
--- a/airflow/providers/google/cloud/example_dags/example_datafusion.py
+++ b/tests/system/providers/google/cloud/datafusion/example_datafusion_async.py
@@ -36,10 +36,11 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionStopPipelineOperator,
     CloudDataFusionUpdateInstanceOperator,
 )
-from airflow.providers.google.cloud.sensors.datafusion import 
CloudDataFusionPipelineStateSensor
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator
 
 # [START howto_data_fusion_env_variables]
 SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 LOCATION = "europe-north1"
 INSTANCE_NAME = "airflow-test-instance"
 INSTANCE = {
@@ -48,19 +49,17 @@ INSTANCE = {
     "dataprocServiceAccount": SERVICE_ACCOUNT,
 }
 
-BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", 
"test-datafusion-bucket-1")
-BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", 
"test-datafusion-bucket-2")
-
+BUCKET_1 = "test-datafusion-1"
+BUCKET_2 = "test-datafusion-2"
 BUCKET_1_URI = f"gs://{BUCKET_1}"
 BUCKET_2_URI = f"gs://{BUCKET_2}"
 
-PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
+PIPELINE_NAME = "test-pipe"
 PIPELINE = {
     "artifact": {
         "name": "cdap-data-pipeline",
-        "version": "6.5.1",
+        "version": "6.7.2",
         "scope": "SYSTEM",
-        "label": "Data Pipeline - System Test",
     },
     "description": "Data Pipeline Application",
     "name": "test-pipe",
@@ -80,7 +79,7 @@ PIPELINE = {
                     "name": "GCSFile",
                     "type": "batchsource",
                     "label": "GCS",
-                    "artifact": {"name": "google-cloud", "version": "0.18.1", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "format": "text",
@@ -109,7 +108,7 @@ PIPELINE = {
                     "name": "GCS",
                     "type": "batchsink",
                     "label": "GCS2",
-                    "artifact": {"name": "google-cloud", "version": "0.18.1", 
"scope": "SYSTEM"},
+                    "artifact": {"name": "google-cloud", "version": "0.20.3", 
"scope": "SYSTEM"},
                     "properties": {
                         "project": "auto-detect",
                         "suffix": "yyyy-MM-dd-HH-mm",
@@ -145,12 +144,23 @@ PIPELINE = {
 }
 # [END howto_data_fusion_env_variables]
 
-
 with models.DAG(
-    "example_data_fusion",
+    "example_data_fusion_async",
     start_date=datetime(2021, 1, 1),
     catchup=False,
 ) as dag:
+    create_bucket1 = GCSCreateBucketOperator(
+        task_id="create_bucket1",
+        bucket_name=BUCKET_1,
+        project_id=PROJECT_ID,
+    )
+
+    create_bucket2 = GCSCreateBucketOperator(
+        task_id="create_bucket2",
+        bucket_name=BUCKET_2,
+        project_id=PROJECT_ID,
+    )
+
     # [START howto_cloud_data_fusion_create_instance_operator]
     create_instance = CloudDataFusionCreateInstanceOperator(
         location=LOCATION,
@@ -198,37 +208,15 @@ with models.DAG(
     )
     # [END howto_cloud_data_fusion_list_pipelines]
 
-    # [START howto_cloud_data_fusion_start_pipeline]
-    start_pipeline = CloudDataFusionStartPipelineOperator(
-        location=LOCATION,
-        pipeline_name=PIPELINE_NAME,
-        instance_name=INSTANCE_NAME,
-        task_id="start_pipeline",
-    )
-    # [END howto_cloud_data_fusion_start_pipeline]
-
-    # [START howto_cloud_data_fusion_start_pipeline_async]
+    # [START howto_cloud_data_fusion_start_pipeline_def]
     start_pipeline_async = CloudDataFusionStartPipelineOperator(
         location=LOCATION,
         pipeline_name=PIPELINE_NAME,
         instance_name=INSTANCE_NAME,
-        asynchronous=True,
-        task_id="start_pipeline_async",
+        deferrable=True,
+        task_id="start_pipeline_def",
     )
-
-    # [END howto_cloud_data_fusion_start_pipeline_async]
-
-    # [START howto_cloud_data_fusion_start_pipeline_sensor]
-    start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
-        task_id="pipeline_state_sensor",
-        pipeline_name=PIPELINE_NAME,
-        pipeline_id=start_pipeline_async.output,
-        expected_statuses=["COMPLETED"],
-        failure_statuses=["FAILED"],
-        instance_name=INSTANCE_NAME,
-        location=LOCATION,
-    )
-    # [END howto_cloud_data_fusion_start_pipeline_sensor]
+    # [END howto_cloud_data_fusion_start_pipeline_def]
 
     # [START howto_cloud_data_fusion_stop_pipeline]
     stop_pipeline = CloudDataFusionStopPipelineOperator(
@@ -253,23 +241,41 @@ with models.DAG(
         location=LOCATION, instance_name=INSTANCE_NAME, 
task_id="delete_instance"
     )
     # [END howto_cloud_data_fusion_delete_instance_operator]
-
+    #
     # Add sleep before creating pipeline
-    sleep = BashOperator(task_id="sleep", bash_command="sleep 60")
+    sleep_30_1 = BashOperator(task_id="sleep_30_1", bash_command="sleep 30")
+
+    # Add sleep before deleting pipeline
+    sleep_30 = BashOperator(task_id="sleep_30", bash_command="sleep 30")
+
+    # Add sleep before starting pipeline
+    sleep_20 = BashOperator(task_id="sleep_20", bash_command="sleep 40")
 
-    create_instance >> get_instance >> restart_instance >> update_instance >> 
sleep
     (
-        sleep
+        create_bucket1
+        >> create_bucket2
+        >> create_instance
+        >> get_instance
+        >> restart_instance
+        >> update_instance
+        >> sleep_30_1
         >> create_pipeline
         >> list_pipelines
+        >> sleep_20
         >> start_pipeline_async
-        >> start_pipeline_sensor
-        >> start_pipeline
         >> stop_pipeline
+        >> sleep_30
         >> delete_pipeline
+        >> delete_instance
     )
-    delete_pipeline >> delete_instance
 
-if __name__ == "__main__":
-    dag.clear()
-    dag.run()
+    from tests.system.utils.watcher import watcher
+
+# This test needs watcher in order to properly mark success/failure
+# when "tearDown" task with trigger rule is part of the DAG
+list(dag.tasks) >> watcher()
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to