sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576082062


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +148,116 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+                    self.log.info("Cluster deletion initiated, awaiting 
completion...")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with 
message: %s", event["message"])
+                    self.log.info("Finished handling cluster deletion.")
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+
+    async def wait_until_cluster_deleted(self):
+        """Wait until the cluster is confirmed as deleted."""
+        end_time = time.time() + self.polling_interval_seconds * 10  # Set end 
time for loop
+        try:
+            while time.time() < end_time:
+                try:
+                    await self.get_async_hook().get_cluster(
+                        region=self.region,
+                        cluster_name=self.cluster_name,
+                        project_id=self.project_id,
+                    )
+                    self.log.info(
+                        "Cluster still exists. Sleeping for %s seconds.", 
self.polling_interval_seconds
+                    )
+                    await asyncio.sleep(self.polling_interval_seconds)
+                except NotFound:
+                    self.log.info("Cluster successfully deleted.")
+                    yield TriggerEvent({"status": "success", "message": 
"Cluster deleted successfully."})
+                    return
+        except Exception as e:
+            self.log.error("Error while checking for cluster deletion: %s", e)
+            yield TriggerEvent({"status": "error", "message": str(e)})
+        yield TriggerEvent(
+            {"status": "error", "message": "Timeout - cluster deletion not 
confirmed within expected time."}
+        )
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        )
+
+    async def gather_diagnostics_and_delete_on_error(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic 
information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+            )
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", 
self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",

Review Comment:
   Removed it and i have added logging instead now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to