This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d361761dee Early delete a Dataproc cluster if started in the ERROR
state. (#33668)
d361761dee is described below
commit d361761deeffe628f3c17ab0debd0e11515c22da
Author: Kristopher Kane <[email protected]>
AuthorDate: Wed Aug 30 01:29:20 2023 -0400
Early delete a Dataproc cluster if started in the ERROR state. (#33668)
* Early delete a Dataproc cluster if started in the ERROR state.
Update airflow/providers/google/cloud/operators/dataproc.py
Co-authored-by: Alex Cazacu <[email protected]>
* fixing up logging of exceptions
---------
Co-authored-by: Alex Cazacu <[email protected]>
---
.../providers/google/cloud/operators/dataproc.py | 22 ++++++++++++-
.../google/cloud/operators/test_dataproc.py | 37 +++++++++++++---------
2 files changed, 43 insertions(+), 16 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index aee0b6fa72..6a8283659b 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -595,13 +595,17 @@ class
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
if cluster.status.state != cluster.status.State.ERROR:
return
self.log.info("Cluster is in ERROR state")
+ self.log.info("Gathering diagnostic information.")
gcs_uri = hook.diagnose_cluster(
region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
)
self.log.info("Diagnostic information for cluster %s available at:
%s", self.cluster_name, gcs_uri)
if self.delete_on_error:
self._delete_cluster(hook)
- raise AirflowException("Cluster was created but was in ERROR
state.")
+ # The delete op is asynchronous and can cause further failure if
the cluster finishes
+ # deleting between catching AlreadyExists and checking state
+ self._wait_for_cluster_in_deleting_state(hook)
+ raise AirflowException("Cluster was created in an ERROR state then
deleted.")
raise AirflowException("Cluster was created but is in ERROR state")
def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None:
@@ -668,6 +672,22 @@ class
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
+ except AirflowException as ae:
+ # There still could be a cluster created here in an ERROR state
which
+ # should be deleted immediately rather than consuming another
retry attempt
+ # (assuming delete_on_error is true (default))
+ # This reduces overall the number of task attempts from 3 to 2 to
successful cluster creation
+ # assuming the underlying GCE issues have resolved within that
window. Users can configure
+ # a higher number of retry attempts in powers of two with 30s-60s
wait interval
+ try:
+ cluster = self._get_cluster(hook)
+ self._handle_error_state(hook, cluster)
+ except AirflowException as ae_inner:
+ # We could get any number of failures here, including cluster
not found and we
+ # can just ignore to ensure we surface the original cluster
create failure
+ self.log.error(ae_inner, exc_info=True)
+ finally:
+ raise ae
# Check if cluster is not in ERROR state
self._handle_error_state(hook, cluster)
diff --git a/tests/providers/google/cloud/operators/test_dataproc.py
b/tests/providers/google/cloud/operators/test_dataproc.py
index 6ddaec8be8..923a7abae6 100644
--- a/tests/providers/google/cloud/operators/test_dataproc.py
+++ b/tests/providers/google/cloud/operators/test_dataproc.py
@@ -619,13 +619,16 @@ class
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
with pytest.raises(AlreadyExists):
op.execute(context=self.mock_context)
+
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._wait_for_cluster_in_deleting_state"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
- def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
+ def test_execute_if_cluster_exists_in_error_state(self, mock_hook,
mock_wait_for_deleting):
mock_hook.return_value.create_cluster.side_effect =
[AlreadyExists("test")]
cluster_status = mock_hook.return_value.get_cluster.return_value.status
cluster_status.state = 0
cluster_status.State.ERROR = 0
+ mock_wait_for_deleting.return_value.get_cluster.side_effect =
[NotFound]
+
op = DataprocCreateClusterOperator(
task_id=TASK_ID,
region=GCP_REGION,
@@ -650,24 +653,30 @@ class
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
region=GCP_REGION, project_id=GCP_PROJECT,
cluster_name=CLUSTER_NAME
)
+ @mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
@mock.patch(DATAPROC_PATH.format("exponential_sleep_generator"))
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._create_cluster"))
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._get_cluster"))
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute_if_cluster_exists_in_deleting_state(
- self, mock_hook, mock_get_cluster, mock_create_cluster, mock_generator
+ self,
+ mock_hook,
+ mock_get_cluster,
+ mock_create_cluster,
+ mock_generator,
+ to_dict_mock,
):
- cluster = mock.MagicMock()
- cluster.status.state = 0
- cluster.status.State.DELETING = 0
+ cluster_deleting = mock.MagicMock()
+ cluster_deleting.status.state = 0
+ cluster_deleting.status.State.DELETING = 0
- cluster2 = mock.MagicMock()
- cluster2.status.state = 0
- cluster2.status.State.ERROR = 0
+ cluster_running = mock.MagicMock()
+ cluster_running.status.state = 0
+ cluster_running.status.State.RUNNING = 0
- mock_create_cluster.side_effect = [AlreadyExists("test"), cluster2]
+ mock_create_cluster.side_effect = [AlreadyExists("test"),
cluster_running]
mock_generator.return_value = [0]
- mock_get_cluster.side_effect = [cluster, NotFound("test")]
+ mock_get_cluster.side_effect = [cluster_deleting, NotFound("test")]
op = DataprocCreateClusterOperator(
task_id=TASK_ID,
@@ -679,15 +688,13 @@ class
TestDataprocClusterCreateOperator(DataprocClusterTestBase):
delete_on_error=True,
gcp_conn_id=GCP_CONN_ID,
)
- with pytest.raises(AirflowException):
- op.execute(context=self.mock_context)
+ op.execute(context=self.mock_context)
calls = [mock.call(mock_hook.return_value),
mock.call(mock_hook.return_value)]
mock_get_cluster.assert_has_calls(calls)
mock_create_cluster.assert_has_calls(calls)
- mock_hook.return_value.diagnose_cluster.assert_called_once_with(
- region=GCP_REGION, project_id=GCP_PROJECT,
cluster_name=CLUSTER_NAME
- )
+
+ to_dict_mock.assert_called_once_with(cluster_running)
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
@mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))