pankajastro commented on code in PR #28529:
URL: https://github.com/apache/airflow/pull/28529#discussion_r1059079498


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -84,3 +84,64 @@ async def run(self):
                     raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
             await asyncio.sleep(self.polling_interval_seconds)
         yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+
+
+class DataprocClusterTrigger(BaseTrigger):
+    """
+    Trigger that periodically polls information from Dataproc API to verify 
status.
+    Implementation leverages asynchronous transport.
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        region: str,
+        project_id: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        poll_interval: int = 5,
+    ):
+        super().__init__()
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.cluster_name = cluster_name
+        self.project_id = project_id
+        self.region = region
+        self.poll_interval = poll_interval
+        self.delegate_to = delegate_to
+        self.hook = DataprocAsyncHook(
+            delegate_to=self.delegate_to,
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+    def serialize(self):
+        return (
+            
"airflow.providers.google.cloud.triggers.dataproc.DataprocClusterTrigger",
+            {
+                "cluster_name": self.cluster_name,
+                "project_id": self.project_id,
+                "region": self.region,
+                "gcp_conn_id": self.gcp_conn_id,
+                "delegate_to": self.delegate_to,
+                "impersonation_chain": self.impersonation_chain,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self):
+        while True:
+            cluster = await self.hook.get_cluster(
+                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
+            )
+            state = cluster.status.state
+            self.log.info("Dataproc cluster: %s is in state: %s", 
self.cluster_name, state)
+            if state in (
+                ClusterStatus.State.ERROR,
+                ClusterStatus.State.RUNNING,
+                ClusterStatus.State.DELETING,

Review Comment:
   In the case of DELETING state, you should wait for the cluster to get 
deleted and then try to re-create it. I'm talking here when you are using this 
trigger for DataprocCreateClusterOperator



##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -602,22 +609,50 @@ def execute(self, context: Context) -> dict:
                 raise
             self.log.info("Cluster already exists.")
             cluster = self._get_cluster(hook)
-
-        # Check if cluster is not in ERROR state
-        self._handle_error_state(hook, cluster)
-        if cluster.status.state == cluster.status.State.CREATING:
-            # Wait for cluster to be created
-            cluster = self._wait_for_cluster_in_creating_state(hook)
-            self._handle_error_state(hook, cluster)
-        elif cluster.status.state == cluster.status.State.DELETING:
-            # Wait for cluster to be deleted
-            self._wait_for_cluster_in_deleting_state(hook)
-            # Create new cluster
-            cluster = self._create_cluster(hook)
+        if not self.deferrable:
+            # Check if cluster is not in ERROR state
             self._handle_error_state(hook, cluster)
-
+            if cluster.status.state == cluster.status.State.CREATING:
+                # Wait for cluster to be created
+                cluster = self._wait_for_cluster_in_creating_state(hook)
+                self._handle_error_state(hook, cluster)
+            elif cluster.status.state == cluster.status.State.DELETING:
+                # Wait for cluster to be deleted
+                self._wait_for_cluster_in_deleting_state(hook)
+                # Create new cluster
+                cluster = self._create_cluster(hook)
+                self._handle_error_state(hook, cluster)
+        else:
+            self.defer(
+                trigger=DataprocClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    project_id=self.project_id,
+                    region=self.region,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    poll_interval=self.poll_interval,
+                ),
+                method_name="execute_complete",
+            )
         return Cluster.to_dict(cluster)
 
+    def execute_complete(self, context, event=None) -> None:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event is None:
+            raise AirflowException("Cluster failed.")
+        cluster_state = event["cluster_state"]
+        cluster_name = event["cluster_name"]
+
+        if cluster_state == ClusterStatus.State.ERROR:
+            raise AirflowException("Cluster is in ERROR state.")
+        if cluster_state == ClusterStatus.State.DELETING:
+            raise AirflowException(f"Cluster is being 
deleted:\n{cluster_name}")
+        self.log.info("%s completed successfully.", self.task_id)

Review Comment:
   you should push the cluster details in xcom as sync version of operator does



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -84,3 +84,64 @@ async def run(self):
                     raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
             await asyncio.sleep(self.polling_interval_seconds)
         yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+
+
+class DataprocClusterTrigger(BaseTrigger):

Review Comment:
   does it make sense to extend this class from DataprocBaseTrigger instead of 
BaseTrigger 
https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/triggers/dataproc.py#L31
 WDYT?



##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -602,22 +609,50 @@ def execute(self, context: Context) -> dict:
                 raise
             self.log.info("Cluster already exists.")
             cluster = self._get_cluster(hook)
-
-        # Check if cluster is not in ERROR state
-        self._handle_error_state(hook, cluster)
-        if cluster.status.state == cluster.status.State.CREATING:
-            # Wait for cluster to be created
-            cluster = self._wait_for_cluster_in_creating_state(hook)
-            self._handle_error_state(hook, cluster)
-        elif cluster.status.state == cluster.status.State.DELETING:
-            # Wait for cluster to be deleted
-            self._wait_for_cluster_in_deleting_state(hook)
-            # Create new cluster
-            cluster = self._create_cluster(hook)
+        if not self.deferrable:
+            # Check if cluster is not in ERROR state
             self._handle_error_state(hook, cluster)
-
+            if cluster.status.state == cluster.status.State.CREATING:
+                # Wait for cluster to be created
+                cluster = self._wait_for_cluster_in_creating_state(hook)
+                self._handle_error_state(hook, cluster)
+            elif cluster.status.state == cluster.status.State.DELETING:
+                # Wait for cluster to be deleted
+                self._wait_for_cluster_in_deleting_state(hook)
+                # Create new cluster
+                cluster = self._create_cluster(hook)

Review Comment:
   In case of async operator _create_cluster can be a async API call here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to