pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576203500


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:

Review Comment:
   Would like to understand when exactly is the CancelledError raised.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   Does using the sync hook method wait until the cluster is deleted? If so, it 
would block the triggerer thread. Would it be possible to have an async hook 
and method here?
   
   If it's not possible to delete async, we could send the action to worker in 
execute_complete and delete synchronously from the worker. 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -43,20 +44,28 @@ def __init__(
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         polling_interval_seconds: int = 30,
+        delete_on_error: bool = True,
     ):
         super().__init__()
         self.region = region
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
         self.polling_interval_seconds = polling_interval_seconds
+        self.delete_on_error = delete_on_error
 
     def get_async_hook(self):
         return DataprocAsyncHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
 
+    def get_sync_hook(self):

Review Comment:
   We should avoid using sync hook methods in the Triggerer as it would block 
the triggerer thread.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with 
message: %s", event["message"])

Review Comment:
   why do we have `async for` here? `self.wait_until_cluster_deleted` does not 
seem to return an async iterable



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", 
self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")

Review Comment:
   These logs lines are confusing. We say cluster is deleted and then say 
cluster deletion initiated.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", 
self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:

Review Comment:
   Could this block be combined as a common block under a cleanup method like 
Wei suggested and use it in the gather_diagnostics_and_delete_on_error method 
too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to