This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d361761dee Early delete a Dataproc cluster if started in the ERROR 
state. (#33668)
d361761dee is described below

commit d361761deeffe628f3c17ab0debd0e11515c22da
Author: Kristopher Kane <[email protected]>
AuthorDate: Wed Aug 30 01:29:20 2023 -0400

    Early delete a Dataproc cluster if started in the ERROR state. (#33668)
    
    * Early delete a Dataproc cluster if started in the ERROR state.
    
    Update airflow/providers/google/cloud/operators/dataproc.py
    
    Co-authored-by: Alex Cazacu <[email protected]>
    
    * fixing up logging of exceptions
    
    ---------
    
    Co-authored-by: Alex Cazacu <[email protected]>
---
 .../providers/google/cloud/operators/dataproc.py   | 22 ++++++++++++-
 .../google/cloud/operators/test_dataproc.py        | 37 +++++++++++++---------
 2 files changed, 43 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index aee0b6fa72..6a8283659b 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -595,13 +595,17 @@ class 
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
         if cluster.status.state != cluster.status.State.ERROR:
             return
         self.log.info("Cluster is in ERROR state")
+        self.log.info("Gathering diagnostic information.")
         gcs_uri = hook.diagnose_cluster(
             region=self.region, cluster_name=self.cluster_name, 
project_id=self.project_id
         )
         self.log.info("Diagnostic information for cluster %s available at: 
%s", self.cluster_name, gcs_uri)
         if self.delete_on_error:
             self._delete_cluster(hook)
-            raise AirflowException("Cluster was created but was in ERROR 
state.")
+            # The delete op is asynchronous and can cause further failure if 
the cluster finishes
+            # deleting between catching AlreadyExists and checking state
+            self._wait_for_cluster_in_deleting_state(hook)
+            raise AirflowException("Cluster was created in an ERROR state then 
deleted.")
         raise AirflowException("Cluster was created but is in ERROR state")
 
     def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None:
@@ -668,6 +672,22 @@ class 
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
                 raise
             self.log.info("Cluster already exists.")
             cluster = self._get_cluster(hook)
+        except AirflowException as ae:
+            # There still could be a cluster created here in an ERROR state 
which
+            # should be deleted immediately rather than consuming another 
retry attempt
+            # (assuming delete_on_error is true (default))
+            # This reduces overall the number of task attempts from 3 to 2 to 
successful cluster creation
+            # assuming the underlying GCE issues have resolved within that 
window. Users can configure
+            # a higher number of retry attempts in powers of two with 30s-60s 
wait interval
+            try:
+                cluster = self._get_cluster(hook)
+                self._handle_error_state(hook, cluster)
+            except AirflowException as ae_inner:
+                # We could get any number of failures here, including cluster 
not found and we
+                # can just ignore to ensure we surface the original cluster 
create failure
+                self.log.error(ae_inner, exc_info=True)
+            finally:
+                raise ae
 
         # Check if cluster is not in ERROR state
         self._handle_error_state(hook, cluster)
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py 
b/tests/providers/google/cloud/operators/test_dataproc.py
index 6ddaec8be8..923a7abae6 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -619,13 +619,16 @@ class 
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
         with pytest.raises(AlreadyExists):
             op.execute(context=self.mock_context)
 
+    
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._wait_for_cluster_in_deleting_state"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
-    def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
+    def test_execute_if_cluster_exists_in_error_state(self, mock_hook, 
mock_wait_for_deleting):
         mock_hook.return_value.create_cluster.side_effect = 
[AlreadyExists("test")]
         cluster_status = mock_hook.return_value.get_cluster.return_value.status
         cluster_status.state = 0
         cluster_status.State.ERROR = 0
 
+        mock_wait_for_deleting.return_value.get_cluster.side_effect = 
[NotFound]
+
         op = DataprocCreateClusterOperator(
             task_id=TASK_ID,
             region=GCP_REGION,
@@ -650,24 +653,30 @@ class 
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
             region=GCP_REGION, project_id=GCP_PROJECT, 
cluster_name=CLUSTER_NAME
         )
 
+    @mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
     @mock.patch(DATAPROC_PATH.format("exponential_sleep_generator"))
     
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._create_cluster"))
     
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._get_cluster"))
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     def test_execute_if_cluster_exists_in_deleting_state(
-        self, mock_hook, mock_get_cluster, mock_create_cluster, mock_generator
+        self,
+        mock_hook,
+        mock_get_cluster,
+        mock_create_cluster,
+        mock_generator,
+        to_dict_mock,
     ):
-        cluster = mock.MagicMock()
-        cluster.status.state = 0
-        cluster.status.State.DELETING = 0
+        cluster_deleting = mock.MagicMock()
+        cluster_deleting.status.state = 0
+        cluster_deleting.status.State.DELETING = 0
 
-        cluster2 = mock.MagicMock()
-        cluster2.status.state = 0
-        cluster2.status.State.ERROR = 0
+        cluster_running = mock.MagicMock()
+        cluster_running.status.state = 0
+        cluster_running.status.State.RUNNING = 0
 
-        mock_create_cluster.side_effect = [AlreadyExists("test"), cluster2]
+        mock_create_cluster.side_effect = [AlreadyExists("test"), 
cluster_running]
         mock_generator.return_value = [0]
-        mock_get_cluster.side_effect = [cluster, NotFound("test")]
+        mock_get_cluster.side_effect = [cluster_deleting, NotFound("test")]
 
         op = DataprocCreateClusterOperator(
             task_id=TASK_ID,
@@ -679,15 +688,13 @@ class 
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
             delete_on_error=True,
             gcp_conn_id=GCP_CONN_ID,
         )
-        with pytest.raises(AirflowException):
-            op.execute(context=self.mock_context)
 
+        op.execute(context=self.mock_context)
         calls = [mock.call(mock_hook.return_value), 
mock.call(mock_hook.return_value)]
         mock_get_cluster.assert_has_calls(calls)
         mock_create_cluster.assert_has_calls(calls)
-        mock_hook.return_value.diagnose_cluster.assert_called_once_with(
-            region=GCP_REGION, project_id=GCP_PROJECT, 
cluster_name=CLUSTER_NAME
-        )
+
+        to_dict_mock.assert_called_once_with(cluster_running)
 
     @mock.patch(DATAPROC_PATH.format("DataprocHook"))
     @mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))

Reply via email to