pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576203500
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ """Run the trigger."""
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.gather_diagnostics_and_delete_on_error(cluster)
+ break
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ break
+
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
Review Comment:
Would like to understand when exactly is the CancelledError raised.
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ """Run the trigger."""
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.gather_diagnostics_and_delete_on_error(cluster)
+ break
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ break
+
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
+ try:
+ if self.delete_on_error:
+ self.log.info("Deleting cluster %s.", self.cluster_name)
+ self.get_sync_hook().delete_cluster(
Review Comment:
Does using the sync hook method wait until the cluster is deleted? If so, it
would block the triggerer thread. Would it be possible to have an async hook
and method here?
If it's not possible to delete async, we could send the action to worker in
execute_complete and delete synchronously from the worker.
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -43,20 +44,28 @@ def __init__(
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
polling_interval_seconds: int = 30,
+ delete_on_error: bool = True,
):
super().__init__()
self.region = region
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.polling_interval_seconds = polling_interval_seconds
+ self.delete_on_error = delete_on_error
def get_async_hook(self):
return DataprocAsyncHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
+ def get_sync_hook(self):
Review Comment:
We should avoid using sync hook methods in the Triggerer as it would block
the triggerer thread.
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ """Run the trigger."""
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.gather_diagnostics_and_delete_on_error(cluster)
+ break
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ break
+
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
+ try:
+ if self.delete_on_error:
+ self.log.info("Deleting cluster %s.", self.cluster_name)
+ self.get_sync_hook().delete_cluster(
+ region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
+ )
+ self.log.info("Deleted cluster %s during cancellation.",
self.cluster_name)
+ self.log.info("Cluster deletion initiated.")
+ async for event in self.wait_until_cluster_deleted():
+ if event["status"] == "success":
+ self.log.info("Cluster deletion confirmed.")
+ elif event["status"] == "error":
+ self.log.error("Cluster deletion failed with
message: %s", event["message"])
Review Comment:
why do we have `async for` here? `self.wait_until_cluster_deleted` does not
seem to return an async iterable
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ """Run the trigger."""
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.gather_diagnostics_and_delete_on_error(cluster)
+ break
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ break
+
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
+ try:
+ if self.delete_on_error:
+ self.log.info("Deleting cluster %s.", self.cluster_name)
+ self.get_sync_hook().delete_cluster(
+ region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
+ )
+ self.log.info("Deleted cluster %s during cancellation.",
self.cluster_name)
+ self.log.info("Cluster deletion initiated.")
Review Comment:
These logs lines are confusing. We say cluster is deleted and then say
cluster deletion initiated.
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"polling_interval_seconds": self.polling_interval_seconds,
+ "delete_on_error": self.delete_on_error,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
- while True:
- cluster = await self.get_async_hook().get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ """Run the trigger."""
+ try:
+ while True:
+ cluster = await self.fetch_cluster()
+ state = cluster.status.state
+ if state == ClusterStatus.State.ERROR:
+ await self.gather_diagnostics_and_delete_on_error(cluster)
+ break
+ elif state == ClusterStatus.State.RUNNING:
+ yield TriggerEvent(
+ {
+ "cluster_name": self.cluster_name,
+ "cluster_state": state,
+ "cluster": cluster,
+ }
+ )
+ break
+
+ self.log.info("Sleeping for %s seconds.",
self.polling_interval_seconds)
+ await asyncio.sleep(self.polling_interval_seconds)
+ except asyncio.CancelledError:
+ try:
+ if self.delete_on_error:
Review Comment:
Could this block be combined as a common block under a cleanup method like
Wei suggested and use it in the gather_diagnostics_and_delete_on_error method
too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]