sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579226470


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster 
when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not 
awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through 
the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is 
deleted.
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   As mentioned in the comment the async deletion here doesn't await when task 
is cancelled by the user so this would not work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to