pankajastro commented on code in PR #28529:
URL: https://github.com/apache/airflow/pull/28529#discussion_r1059079498
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -84,3 +84,64 @@ async def run(self):
raise AirflowException(f"Dataproc job execution failed
{self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+
+
+class DataprocClusterTrigger(BaseTrigger):
+ """
+ Trigger that periodically polls information from Dataproc API to verify
status.
+ Implementation leverages asynchronous transport.
+ """
+
+ def __init__(
+ self,
+ cluster_name: str,
+ region: str,
+ project_id: str | None = None,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ delegate_to: str | None = None,
+ poll_interval: int = 5,
+ ):
+ super().__init__()
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.cluster_name = cluster_name
+ self.project_id = project_id
+ self.region = region
+ self.poll_interval = poll_interval
+ self.delegate_to = delegate_to
+ self.hook = DataprocAsyncHook(
+ delegate_to=self.delegate_to,
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ def serialize(self):
+ return (
+
"airflow.providers.google.cloud.triggers.dataproc.DataprocClusterTrigger",
+ {
+ "cluster_name": self.cluster_name,
+ "project_id": self.project_id,
+ "region": self.region,
+ "gcp_conn_id": self.gcp_conn_id,
+ "delegate_to": self.delegate_to,
+ "impersonation_chain": self.impersonation_chain,
+ "poll_interval": self.poll_interval,
+ },
+ )
+
+ async def run(self):
+ while True:
+ cluster = await self.hook.get_cluster(
+ project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
+ )
+ state = cluster.status.state
+ self.log.info("Dataproc cluster: %s is in state: %s",
self.cluster_name, state)
+ if state in (
+ ClusterStatus.State.ERROR,
+ ClusterStatus.State.RUNNING,
+ ClusterStatus.State.DELETING,
Review Comment:
In the case of DELETING state, you should wait for the cluster to get
deleted and then try to re-create it. I'm talking here when you are using this
trigger for DataprocCreateClusterOperator
##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -602,22 +609,50 @@ def execute(self, context: Context) -> dict:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
-
- # Check if cluster is not in ERROR state
- self._handle_error_state(hook, cluster)
- if cluster.status.state == cluster.status.State.CREATING:
- # Wait for cluster to be created
- cluster = self._wait_for_cluster_in_creating_state(hook)
- self._handle_error_state(hook, cluster)
- elif cluster.status.state == cluster.status.State.DELETING:
- # Wait for cluster to be deleted
- self._wait_for_cluster_in_deleting_state(hook)
- # Create new cluster
- cluster = self._create_cluster(hook)
+ if not self.deferrable:
+ # Check if cluster is not in ERROR state
self._handle_error_state(hook, cluster)
-
+ if cluster.status.state == cluster.status.State.CREATING:
+ # Wait for cluster to be created
+ cluster = self._wait_for_cluster_in_creating_state(hook)
+ self._handle_error_state(hook, cluster)
+ elif cluster.status.state == cluster.status.State.DELETING:
+ # Wait for cluster to be deleted
+ self._wait_for_cluster_in_deleting_state(hook)
+ # Create new cluster
+ cluster = self._create_cluster(hook)
+ self._handle_error_state(hook, cluster)
+ else:
+ self.defer(
+ trigger=DataprocClusterTrigger(
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ region=self.region,
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ poll_interval=self.poll_interval,
+ ),
+ method_name="execute_complete",
+ )
return Cluster.to_dict(cluster)
+ def execute_complete(self, context, event=None) -> None:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes
execution was
+ successful.
+ """
+ if event is None:
+ raise AirflowException("Cluster failed.")
+ cluster_state = event["cluster_state"]
+ cluster_name = event["cluster_name"]
+
+ if cluster_state == ClusterStatus.State.ERROR:
+ raise AirflowException("Cluster is in ERROR state.")
+ if cluster_state == ClusterStatus.State.DELETING:
+ raise AirflowException(f"Cluster is being
deleted:\n{cluster_name}")
+ self.log.info("%s completed successfully.", self.task_id)
Review Comment:
you should push the cluster details in xcom as sync version of operator does
##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -84,3 +84,64 @@ async def run(self):
raise AirflowException(f"Dataproc job execution failed
{self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+
+
+class DataprocClusterTrigger(BaseTrigger):
Review Comment:
does it make sense to extend this class from DataprocBaseTrigger instead of
BaseTrigger
https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/triggers/dataproc.py#L31
WDYT?
##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -602,22 +609,50 @@ def execute(self, context: Context) -> dict:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
-
- # Check if cluster is not in ERROR state
- self._handle_error_state(hook, cluster)
- if cluster.status.state == cluster.status.State.CREATING:
- # Wait for cluster to be created
- cluster = self._wait_for_cluster_in_creating_state(hook)
- self._handle_error_state(hook, cluster)
- elif cluster.status.state == cluster.status.State.DELETING:
- # Wait for cluster to be deleted
- self._wait_for_cluster_in_deleting_state(hook)
- # Create new cluster
- cluster = self._create_cluster(hook)
+ if not self.deferrable:
+ # Check if cluster is not in ERROR state
self._handle_error_state(hook, cluster)
-
+ if cluster.status.state == cluster.status.State.CREATING:
+ # Wait for cluster to be created
+ cluster = self._wait_for_cluster_in_creating_state(hook)
+ self._handle_error_state(hook, cluster)
+ elif cluster.status.state == cluster.status.State.DELETING:
+ # Wait for cluster to be deleted
+ self._wait_for_cluster_in_deleting_state(hook)
+ # Create new cluster
+ cluster = self._create_cluster(hook)
Review Comment:
In case of async operator _create_cluster can be a async API call here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]