This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0027d171d90 Add bounded retry cleanup for Redshift cluster deletion
after post-creation failure. (#63074)
0027d171d90 is described below
commit 0027d171d908908692f16755a77bc2e4dea42a25
Author: SameerMesiah97 <[email protected]>
AuthorDate: Wed Mar 11 14:15:57 2026 +0000
Add bounded retry cleanup for Redshift cluster deletion after post-creation
failure. (#63074)
Retry deletion when `InvalidClusterState` or `InvalidClusterStateFault`
indicates the cluster is still processing another operation, and bound retries
using `cleanup_timeout_seconds` (default 300s) to avoid indefinite worker
occupation.
Introduce `_attempt_cleanup_with_retry` helper and update existing cleanup
logic so the original exception is always re-raised while cleanup failures are
logged. Update existing cleanup tests to account for retry behavior and add a
new test covering retry on active cluster operations.
---
.../amazon/aws/operators/redshift_cluster.py | 73 ++++++++++++++++++++--
.../amazon/aws/operators/test_redshift_cluster.py | 66 ++++++++++++++++++-
2 files changed, 130 insertions(+), 9 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 47f4277a600..c050987df1d 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -21,7 +21,8 @@ from collections.abc import Sequence
from datetime import timedelta
from typing import TYPE_CHECKING, Any
-from botocore.exceptions import WaiterError
+from botocore.exceptions import ClientError, WaiterError
+from tenacity import Retrying, retry_if_exception, stop_after_delay, wait_fixed
from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
@@ -110,6 +111,9 @@ class
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
:param deferrable: If True, the operator will run in deferrable mode.
:param delete_cluster_on_failure: If True, best-effort deletion of the
redshift cluster will be attempted
after post-creation failure. Default: True.
+ :param cleanup_timeout_seconds: Maximum time in seconds to attempt
+ best-effort deletion of the cluster when post-creation failure occurs.
+ Default: 300 seconds.
"""
template_fields: Sequence[str] = aws_template_fields(
@@ -193,6 +197,7 @@ class
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
poll_interval: int = 60,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
delete_cluster_on_failure: bool = True,
+ cleanup_timeout_seconds: int = 300,
**kwargs,
):
super().__init__(**kwargs)
@@ -235,6 +240,65 @@ class
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
self.deferrable = deferrable
self.kwargs = kwargs
self.delete_cluster_on_failure = delete_cluster_on_failure
+ self.cleanup_timeout_seconds = cleanup_timeout_seconds
+
+ @staticmethod
+ def _retry_if_cluster_busy(exc: BaseException) -> bool:
+ if isinstance(exc, ClientError):
+ return exc.response["Error"]["Code"] in {
+ "InvalidClusterStateFault",
+ "InvalidClusterState",
+ }
+ return False
+
+ def _attempt_cleanup_with_retry(self) -> None:
+ """
+ Attempt bounded best-effort deletion of the cluster.
+
+ This method is only invoked during task failure handling.
+ It does not block until deletion completes and will not
+ mask the original exception.
+ """
+ RETRY_INTERVAL_SECONDS = 60
+
+ retrying = Retrying(
+ retry=retry_if_exception(self._retry_if_cluster_busy),
+ wait=wait_fixed(RETRY_INTERVAL_SECONDS),
+ stop=stop_after_delay(self.cleanup_timeout_seconds),
+ reraise=True,
+ )
+
+ try:
+ for attempt in retrying:
+ with attempt:
+ self.log.info(
+ "Attempt %s: Deleting Redshift cluster %s.",
+ attempt.retry_state.attempt_number,
+ self.cluster_identifier,
+ )
+
+ # Do not wait for deletion to complete; cleanup is
best-effort.
+
self.hook.delete_cluster(cluster_identifier=self.cluster_identifier)
+
+ self.log.info(
+ "Successfully initiated deletion of Redshift cluster
%s.",
+ self.cluster_identifier,
+ )
+
+ return
+
+ except Exception as e:
+ if self._retry_if_cluster_busy(e):
+ self.log.exception(
+ "Timed out after %s seconds while trying to delete
Redshift cluster %s.",
+ self.cleanup_timeout_seconds,
+ self.cluster_identifier,
+ )
+ else:
+ self.log.exception(
+ "Unexpected error while attempting to delete Redshift
cluster %s.",
+ self.cluster_identifier,
+ )
def execute(self, context: Context):
self.log.info("Creating Redshift cluster %s", self.cluster_identifier)
@@ -340,13 +404,10 @@ class
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
if self.delete_cluster_on_failure:
try:
- self.log.warning(
- "Attempting deletion of Redshift cluster %s.",
self.cluster_identifier
- )
-
self.hook.delete_cluster(cluster_identifier=self.cluster_identifier)
+ self._attempt_cleanup_with_retry()
except Exception:
self.log.exception(
- "Failed while attempting to delete Reshift cluster
%s.",
+ "Failed while attempting to delete Redshift
cluster %s.",
self.cluster_identifier,
)
raise
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
index d8dfe1841b2..59943cfed29 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
@@ -176,6 +176,7 @@ class TestRedshiftCreateClusterOperator:
cluster_type="single-node",
wait_for_completion=True,
delete_cluster_on_failure=True,
+ cleanup_timeout_seconds=300,
)
with pytest.raises(WaiterError):
@@ -184,10 +185,11 @@ class TestRedshiftCreateClusterOperator:
# Cluster creation happened.
mock_conn.create_cluster.assert_called_once()
- # Cleanup attempted.
- mock_delete_cluster.assert_called_once_with(
+ # Cleanup attempted at least once.
+ mock_delete_cluster.assert_called_with(
cluster_identifier="test-cluster",
)
+ assert mock_delete_cluster.call_count >= 1
@mock.patch.object(RedshiftHook, "delete_cluster")
@mock.patch.object(RedshiftHook, "conn")
@@ -226,6 +228,7 @@ class TestRedshiftCreateClusterOperator:
cluster_type="single-node",
wait_for_completion=True,
delete_cluster_on_failure=True,
+ cleanup_timeout_seconds=300,
)
with pytest.raises(WaiterError) as exc:
@@ -238,10 +241,67 @@ class TestRedshiftCreateClusterOperator:
mock_conn.create_cluster.assert_called_once()
# Cleanup attempted despite failure.
- mock_delete_cluster.assert_called_once_with(
+ mock_delete_cluster.assert_called_with(
cluster_identifier="test-cluster",
)
+ # Cleanup attempted despite failure.
+ mock_delete_cluster.assert_called_with(
+ cluster_identifier="test-cluster",
+ )
+ assert mock_delete_cluster.call_count >= 1
+
+ @mock.patch("tenacity.nap.time.sleep", mock.MagicMock())
+ @mock.patch.object(RedshiftHook, "delete_cluster")
+ @mock.patch.object(RedshiftHook, "conn")
+ def test_create_cluster_cleanup_retries_on_active_operation(
+ self,
+ mock_conn,
+ mock_delete_cluster,
+ ):
+ # Simulate waiter failure (e.g. DescribeClusters denied).
+ waiter_error = WaiterError(
+ name="ClusterAvailable",
+ reason="AccessDenied for DescribeClusters",
+ last_response={},
+ )
+ mock_conn.get_waiter.return_value.wait.side_effect = waiter_error
+
+ # First deletion attempt fails due to cluster still modifying.
+ active_operation_error = ClientError(
+ error_response={
+ "Error": {
+ "Code": "InvalidClusterStateFault",
+ "Message": "Cluster currently modifying",
+ }
+ },
+ operation_name="DeleteCluster",
+ )
+
+ # Second attempt succeeds.
+ mock_delete_cluster.side_effect = [
+ active_operation_error,
+ None,
+ ]
+
+ operator = RedshiftCreateClusterOperator(
+ task_id="task_test",
+ cluster_identifier="test-cluster",
+ node_type="ra3.large",
+ master_username="adminuser",
+ master_user_password="Test123$",
+ cluster_type="single-node",
+ wait_for_completion=True,
+ delete_cluster_on_failure=True,
+ cleanup_timeout_seconds=300,
+ )
+
+ with pytest.raises(WaiterError):
+ operator.execute({})
+
+ # Retry should occur.
+ assert mock_delete_cluster.call_count == 2
+
class TestRedshiftCreateClusterSnapshotOperator:
@mock.patch.object(RedshiftHook, "cluster_status")