acazacu commented on code in PR #33668:
URL: https://github.com/apache/airflow/pull/33668#discussion_r1304135978


##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -667,6 +671,23 @@ def execute(self, context: Context) -> dict:
                 raise
             self.log.info("Cluster already exists.")
             cluster = self._get_cluster(hook)
+        except AirflowException as ae:
+            # There still could be a cluster created here in an ERROR state 
which
+            # should be deleted immediately rather than consuming another 
retry attempt
+            # (assuming delete_on_error is true (default))
+            # This reduces overall the number of task attempts from 3 to 2 to 
successful cluster creation
+            # assuming the underlying GCE issues have resolved within that 
window. Users can configure
+            # a higher number of retry attempts in powers of two with 30s-60s 
wait interval
+            try:
+                cluster = self._get_cluster(hook)
+                # redundant condition checking in order to reuse 
_handle_error_sate
+                if cluster.status.state == cluster.status.State.ERROR:

Review Comment:
   Ha, I was also bothered by this issue and was about to open a PR on it. Nice 
catch?
   
   Unless I'm mistaken, `_handle_error_state` implements the same check in its 
header. Isn't it redundant to also have it executed here?



##########
airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -667,6 +671,23 @@ def execute(self, context: Context) -> dict:
                 raise
             self.log.info("Cluster already exists.")
             cluster = self._get_cluster(hook)
+        except AirflowException as ae:
+            # There still could be a cluster created here in an ERROR state 
which
+            # should be deleted immediately rather than consuming another 
retry attempt
+            # (assuming delete_on_error is true (default))
+            # This reduces overall the number of task attempts from 3 to 2 to 
successful cluster creation
+            # assuming the underlying GCE issues have resolved within that 
window. Users can configure
+            # a higher number of retry attempts in powers of two with 30s-60s 
wait interval
+            try:
+                cluster = self._get_cluster(hook)
+                # redundant condition checking in order to reuse 
_handle_error_sate
+                if cluster.status.state == cluster.status.State.ERROR:
+                    self._handle_error_state(hook, cluster)
+            except AirflowException:
+                # We could get any number of failures here, including cluster 
not found and we
+                # can just ignore to ensure we surface the original cluster 
create failure
+                pass
+            raise ae

Review Comment:
   ```suggestion
               finally:
                   # We could get any number of failures here, including 
cluster not found and we
                   # can just ignore to ensure we surface the original cluster 
create failure
                   raise ae
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to