pankajkoti commented on code in PR #39130: URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432
########## airflow/providers/google/cloud/triggers/dataproc.py: ########## @@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, "polling_interval_seconds": self.polling_interval_seconds, + "delete_on_error": self.delete_on_error, }, ) async def run(self) -> AsyncIterator[TriggerEvent]: - while True: - cluster = await self.get_async_hook().get_cluster( - project_id=self.project_id, region=self.region, cluster_name=self.cluster_name + try: + while True: + cluster = await self.fetch_cluster() + state = cluster.status.state + if state == ClusterStatus.State.ERROR: + await self.delete_when_error_occurred(cluster) + yield TriggerEvent( + { + "cluster_name": self.cluster_name, + "cluster_state": ClusterStatus.State.DELETING, + "cluster": cluster, + } + ) + return + elif state == ClusterStatus.State.RUNNING: + yield TriggerEvent( + { + "cluster_name": self.cluster_name, + "cluster_state": state, + "cluster": cluster, + } + ) + return + self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds) + await asyncio.sleep(self.polling_interval_seconds) + except asyncio.CancelledError: + try: + if self.delete_on_error: + self.log.info("Deleting cluster %s.", self.cluster_name) + # The synchronous hook is utilized to delete the cluster when a task is cancelled. + # This is because the asynchronous hook deletion is not awaited when the trigger task + # is cancelled. The call for deleting the cluster through the sync hook is not a blocking + # call, which means it does not wait until the cluster is deleted. + self.get_sync_hook().delete_cluster( + region=self.region, cluster_name=self.cluster_name, project_id=self.project_id + ) + self.log.info("Deleted cluster %s during cancellation.", self.cluster_name) + except Exception as e: + self.log.error("Error during cancellation handling: %s", e) + raise AirflowException("Error during cancellation handling: %s", e) + + async def fetch_cluster(self) -> Cluster: + """Fetch the cluster status.""" + return await self.get_async_hook().get_cluster( + project_id=self.project_id, region=self.region, cluster_name=self.cluster_name + ) + + async def delete_when_error_occurred(self, cluster: Cluster): + """ + Delete the cluster on error. + + :param cluster: The cluster to delete. + """ + if self.delete_on_error: + self.log.info("Deleting cluster %s.", self.cluster_name) + await self.get_async_hook().delete_cluster( + region=self.region, cluster_name=self.cluster_name, project_id=self.project_id + ) + self.log.info("Cluster %s has been deleted.", self.cluster_name) + else: + self.log.info( + "Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name Review Comment: ```suggestion "Cluster %s is not deleted as delete_on_error is set to False.", self.cluster_name ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org