SamWheating commented on code in PR #33668:
URL: https://github.com/apache/airflow/pull/33668#discussion_r1304924763
##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -594,12 +594,16 @@ def _handle_error_state(self, hook: DataprocHook,
cluster: Cluster) -> None:
if cluster.status.state != cluster.status.State.ERROR:
return
self.log.info("Cluster is in ERROR state")
+ self.log.info("Gathering diagnostic information.")
gcs_uri = hook.diagnose_cluster(
region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
)
self.log.info("Diagnostic information for cluster %s available at:
%s", self.cluster_name, gcs_uri)
if self.delete_on_error:
self._delete_cluster(hook)
+ # The delete op is asynchronous and can cause further failure if
the cluster finishes
+ # deleting between catching AlreadyExists and checking state
+ self._wait_for_cluster_in_deleting_state(hook)
raise AirflowException("Cluster was created but was in ERROR
state.")
raise AirflowException("Cluster was created but is in ERROR state")
Review Comment:
```suggestion
raise AirflowException("Cluster was created but is in ERROR state")
```
I know its not a part of this PR, but can we remove this inner Exception? I
think its redundant with the one below it, which would be raised anyways if
this one were removed.
##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -667,6 +671,23 @@ def execute(self, context: Context) -> dict:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
+ except AirflowException as ae:
+ # There still could be a cluster created here in an ERROR state
which
+ # should be deleted immediately rather than consuming another
retry attempt
+ # (assuming delete_on_error is true (default))
+ # This reduces overall the number of task attempts from 3 to 2 to
successful cluster creation
+ # assuming the underlying GCE issues have resolved within that
window. Users can configure
+ # a higher number of retry attempts in powers of two with 30s-60s
wait interval
+ try:
+ cluster = self._get_cluster(hook)
+ # redundant condition checking in order to reuse
_handle_error_sate
+ if cluster.status.state == cluster.status.State.ERROR:
+ self._handle_error_state(hook, cluster)
+ except AirflowException:
+ # We could get any number of failures here, including cluster
not found and we
+ # can just ignore to ensure we surface the original cluster
create failure
+ pass
Review Comment:
Can we log the exception rather than just swallowing it? There is some
potentially useful information here such as a cluster being stuck in the
deleting state which it might be worth communicating to users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]