pankajastro commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579015319


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)

Review Comment:
   can we log cluster state too?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not 
awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is 
deleted.
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   I think we should make an async call here. Perhaps we could periodically 
poll the status to verify its existence. 
https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-dataproc/google/cloud/dataproc_v1/services/cluster_controller/async_client.py#L1046
 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not 
awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is 
deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: 
%s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        )

Review Comment:
   do we need this additional method?
   I think we should directly use in run methid
   ```python
   await self.get_async_hook().get_cluster(
               project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
           )
   ``` 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not 
awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is 
deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: 
%s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to 
False.", self.cluster_name

Review Comment:
   Considering that we're invoking this function only once, and that too when 
we're already aware of an error occurring, I believe it's more efficient to 
directly execute the hook delete method. Therefore, I think the method 
shouldn't be necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to