pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not 
awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is 
deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: 
%s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to 
False.", self.cluster_name

Review Comment:
   ```suggestion
                   "Cluster %s is not deleted as delete_on_error is set to 
False.", self.cluster_name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to