pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.delete_when_error_occurred(cluster)
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": ClusterStatus.State.DELETING,
+ "cluster": cluster,
+ }
+ )
+ return
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ return
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
+ try:
+ if self.delete_on_error:
+ self.log.info("Deleting cluster %s.", self.cluster_name)
+ # The synchronous hook is utilized to delete the cluster
when a task is cancelled.
+ # This is because the asynchronous hook deletion is not
awaited when the trigger task
+ # is cancelled. The call for deleting the cluster through
the sync hook is not a blocking
+ # call, which means it does not wait until the cluster is
deleted.
+ self.get_sync_hook().delete_cluster(
+ region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
+ )
+ self.log.info("Deleted cluster %s during cancellation.",
self.cluster_name)
+ except Exception as e:
+ self.log.error("Error during cancellation handling: %s", e)
+ raise AirflowException("Error during cancellation handling:
%s", e)
+
+ async def fetch_cluster(self) -> Cluster:
+ """Fetch the cluster status."""
+ return await self.get_async_hook().get_cluster(
+ project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ )
+
+ async def delete_when_error_occurred(self, cluster: Cluster):
+ """
+ Delete the cluster on error.
+
+ :param cluster: The cluster to delete.
+ """
+ if self.delete_on_error:
+ self.log.info("Deleting cluster %s.", self.cluster_name)
+ await self.get_async_hook().delete_cluster(
+ region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
+ )
+ self.log.info("Cluster %s has been deleted.", self.cluster_name)
+ else:
+ self.log.info(
+ "Cluster %s is not be deleted as delete_on_error is set to
False.", self.cluster_name
Review Comment:
```suggestion
"Cluster %s is not deleted as delete_on_error is set to
False.", self.cluster_name
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]