SameerMesiah97 commented on code in PR #61951:
URL: https://github.com/apache/airflow/pull/61951#discussion_r2907980939


##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -805,11 +805,47 @@ def _retry_cluster_creation(self, hook: DataprocHook):
         self.log.info("Cluster created.")
         return Cluster.to_dict(cluster)
 
+    def _reconcile_cluster_state(self, hook: DataprocHook, cluster: Cluster) 
-> Cluster:
+
+        if cluster.status.state == cluster.status.State.CREATING:
+            self.log.info("Cluster %s is in CREATING state.", 
self.cluster_name)
+
+            cluster = self._wait_for_cluster_in_creating_state(hook)
+            self._handle_error_state(hook, cluster)
+        elif cluster.status.state == cluster.status.State.DELETING:
+            self.log.info("Cluster %s is in DELETING state.", 
self.cluster_name)
+
+            self._wait_for_cluster_in_deleting_state(hook)
+
+            self.log.info("Attempting to re-create cluster: %s", 
self.cluster_name)
+
+            operation = self._create_cluster(hook)
+            hook.wait_for_operation(
+                timeout=self.timeout,
+                result_retry=self.retry,
+                operation=operation,
+            )
+            cluster = self._get_cluster(hook)
+

Review Comment:
   My interpretation of the operator contract here is that reconciliation is 
single-pass rather than trying to converge all the way to RUNNING.
   
   The docstring says that if the cluster is in DELETING we wait for deletion 
and then recreate the cluster. It doesn’t say we need to additionally wait for 
the recreated cluster to transition to RUNNING.
   
   After recreation we rely on the Dataproc LRO completion 
(`wait_for_operation`) to indicate that cluster creation finished. At that 
point we fetch the cluster and run the usual error handling. So the expectation 
here is that LRO success means creation succeeded, rather than enforcing a full 
state convergence loop inside this branch.
   
   If we tried to enforce full convergence here it could mean multiple 
reconciliation passes, and then we’d also have to decide how many retries we 
allow before giving up, the conditions under which we attempt retries, timeout 
period and other such factors to avoid an infinite loop. It necessitates a 
substantial overhaul of the existing implementation. Not just a minor tweak. 
   
   If you actually mean something like an `if cluster.status.state == 
cluster.status.State.RUNNING` check then that can lead to false negatives 
because there are other valid transitional states after CREATING like 
PROVISIONING or UPDATING. I think `if cluster.status.state in (RUNNING, 
PROVISIONING, UPDATING)` would be more robust but then this would be outside 
the contract of the operator. Perhaps, this might be better for a follow-up PR 
as that is no longer about fulfilling the existing contract but expanding it's 
guarantees. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to