SameerMesiah97 commented on code in PR #61951:
URL: https://github.com/apache/airflow/pull/61951#discussion_r2907980939
##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -805,11 +805,47 @@ def _retry_cluster_creation(self, hook: DataprocHook):
self.log.info("Cluster created.")
return Cluster.to_dict(cluster)
+ def _reconcile_cluster_state(self, hook: DataprocHook, cluster: Cluster)
-> Cluster:
+
+ if cluster.status.state == cluster.status.State.CREATING:
+ self.log.info("Cluster %s is in CREATING state.",
self.cluster_name)
+
+ cluster = self._wait_for_cluster_in_creating_state(hook)
+ self._handle_error_state(hook, cluster)
+ elif cluster.status.state == cluster.status.State.DELETING:
+ self.log.info("Cluster %s is in DELETING state.",
self.cluster_name)
+
+ self._wait_for_cluster_in_deleting_state(hook)
+
+ self.log.info("Attempting to re-create cluster: %s",
self.cluster_name)
+
+ operation = self._create_cluster(hook)
+ hook.wait_for_operation(
+ timeout=self.timeout,
+ result_retry=self.retry,
+ operation=operation,
+ )
+ cluster = self._get_cluster(hook)
+
Review Comment:
My interpretation of the operator contract here is that reconciliation is
single-pass rather than trying to converge all the way to RUNNING.
The docstring says that if the cluster is in DELETING we wait for deletion
and then recreate the cluster. It doesn’t say we need to additionally wait for
the recreated cluster to transition to RUNNING.
After recreation we rely on the Dataproc LRO completion
(`wait_for_operation`) to indicate that cluster creation finished. At that
point we fetch the cluster and run the usual error handling. So the expectation
here is that LRO success means creation succeeded, rather than enforcing a full
state convergence loop inside this branch.
If we tried to enforce full convergence here it could mean multiple
reconciliation passes, and then we’d also have to decide how many retries we
allow before giving up, the conditions under which we attempt retries, timeout
period and other such factors to avoid an infinite loop. It necessitates a
substantial overhaul of the existing implementation. Not just a minor tweak.
If you actually mean something like an `if cluster.status.state ==
cluster.status.State.RUNNING` check then that can lead to false negatives
because there are other valid transitional states after CREATING like
PROVISIONING or UPDATING. I think `if cluster.status.state in (RUNNING,
PROVISIONING, UPDATING)` would be more robust but then this would be outside
the contract of the operator. Perhaps, this might be better for a follow-up PR
as that is no longer about fulfilling the existing contract but expanding it's
guarantees.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]