This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6ca21bfbad9 Add best-effort cleanup to EksCreateNodegroupOperator on
post-create failure (#61145)
6ca21bfbad9 is described below
commit 6ca21bfbad9f9b5c6b769c706f7b13af55801f4e
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue Feb 10 14:58:01 2026 +0000
Add best-effort cleanup to EksCreateNodegroupOperator on post-create
failure (#61145)
---
.../airflow/providers/amazon/aws/operators/eks.py | 65 +++++++++++++---
.../tests/unit/amazon/aws/operators/test_eks.py | 89 +++++++++++++++++++++-
2 files changed, 141 insertions(+), 13 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
index a35a206cadc..2b6ae60dc18 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
@@ -89,6 +89,7 @@ def _create_compute(
fargate_selectors: list | None = None,
create_fargate_profile_kwargs: dict | None = None,
subnets: list[str] | None = None,
+ delete_nodegroup_on_failure: bool = True,
):
log = logging.getLogger(__name__)
eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region)
@@ -104,17 +105,49 @@ def _create_compute(
nodeRole=nodegroup_role_arn,
**create_nodegroup_kwargs,
)
- if wait_for_completion:
- log.info("Waiting for nodegroup to provision. This will take some
time.")
- wait(
- waiter=eks_hook.conn.get_waiter("nodegroup_active"),
- waiter_delay=waiter_delay,
- waiter_max_attempts=waiter_max_attempts,
- args={"clusterName": cluster_name, "nodegroupName":
nodegroup_name},
- failure_message="Nodegroup creation failed",
- status_message="Nodegroup status is",
- status_args=["nodegroup.status"],
+ try:
+ if wait_for_completion:
+ log.info("Waiting for nodegroup to provision. This will take
some time.")
+ wait(
+ waiter=eks_hook.conn.get_waiter("nodegroup_active"),
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ args={"clusterName": cluster_name, "nodegroupName":
nodegroup_name},
+ failure_message="Nodegroup creation failed",
+ status_message="Nodegroup status is",
+ status_args=["nodegroup.status"],
+ )
+
+ # waiter_with_logging.wait wraps botocore WaiterError in
AirflowException.
+ # WaiterError is caught to handle changes in the implementation of
waiter_with_logging.wait.
+ except (WaiterError, AirflowException):
+ # Best-effort cleanup when post-initiation steps fail (e.g.
IAM/permission errors).
+ log.exception(
+ "Nodegroup '%s' in cluster '%s' failed after creation during
wait phase",
+ nodegroup_name,
+ cluster_name,
)
+
+ # delete_nodegroup_on_failure defaults to True to prevent orphaned
nodegroups.
+ if delete_nodegroup_on_failure:
+ try:
+ eks_hook.delete_nodegroup(
+ clusterName=cluster_name,
+ nodegroupName=nodegroup_name,
+ )
+ log.info(
+ "Issued delete request for nodegroup '%s' in cluster
'%s' after failure.",
+ nodegroup_name,
+ cluster_name,
+ )
+ except ClientError:
+ log.exception(
+ "Failed to cleanup nodegroup '%s' in cluster '%s'
after failure; "
+ "manual cleanup may be required.",
+ nodegroup_name,
+ cluster_name,
+ )
+ raise
elif compute == "fargate" and fargate_profile_name:
# this is to satisfy mypy
create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
@@ -199,7 +232,8 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
:param deferrable: If True, the operator will wait asynchronously for the
job to complete.
This implies waiting for completion. This mode requires aiobotocore
module to be installed.
(default: False)
-
+ :param delete_nodegroup_on_failure: Whether to attempt best-effort
deletion of the managed nodegroup if creation
+ fails during the wait phase after successful initiation. Defaults to
True.
"""
aws_hook_class = EksHook
@@ -238,6 +272,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
waiter_delay: int = 30,
waiter_max_attempts: int = 40,
+ delete_nodegroup_on_failure: bool = True,
**kwargs,
) -> None:
self.compute = compute
@@ -258,6 +293,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
self.fargate_selectors = fargate_selectors or [{"namespace":
DEFAULT_NAMESPACE_NAME}]
self.fargate_profile_name = fargate_profile_name
self.deferrable = deferrable
+ self.delete_nodegroup_on_failure = delete_nodegroup_on_failure
if region is not None:
warnings.warn(
@@ -379,6 +415,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
fargate_selectors=self.fargate_selectors,
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
subnets=cast("list[str]",
self.resources_vpc_config.get("subnetIds")),
+ delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
)
if self.compute == "fargate":
self.defer(
@@ -454,7 +491,8 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
:param deferrable: If True, the operator will wait asynchronously for the
nodegroup to be created.
This implies waiting for completion. This mode requires aiobotocore
module to be installed.
(default: False)
-
+ :param delete_nodegroup_on_failure: Whether to attempt best-effort
deletion of the managed nodegroup if creation
+ fails during the wait phase after successful initiation. Defaults to
True.
"""
aws_hook_class = EksHook
@@ -479,6 +517,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
waiter_delay: int = 30,
waiter_max_attempts: int = 80,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ delete_nodegroup_on_failure: bool = True,
**kwargs,
) -> None:
self.nodegroup_subnets = nodegroup_subnets
@@ -493,6 +532,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
+ self.delete_nodegroup_on_failure = delete_nodegroup_on_failure
if region is not None:
warnings.warn(
@@ -531,6 +571,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
nodegroup_role_arn=self.nodegroup_role_arn,
create_nodegroup_kwargs=self.create_nodegroup_kwargs,
subnets=self.nodegroup_subnets,
+ delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
)
if self.deferrable:
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
index d997483a60d..3cf453cb5e0 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
@@ -21,9 +21,10 @@ from typing import Any, TypedDict
from unittest import mock
import pytest
+from botocore.exceptions import ClientError
from botocore.waiter import Waiter
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook
from airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
@@ -586,6 +587,92 @@ class TestEksCreateNodegroupOperator:
)
assert m.operator.region_name == "us-east-2"
+ @mock.patch.object(EksHook, "delete_nodegroup")
+ @mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
+ @mock.patch.object(EksHook, "create_nodegroup")
+ def test_nodegroup_cleanup_on_waiter_auth_failure(
+ self,
+ mock_create_nodegroup,
+ mock_waiter,
+ mock_delete_nodegroup,
+ ):
+ # Airflow currently wraps waiter errors with AirflowException, but the
code intentionally supports both.
+ waiter_error = AirflowException("Nodegroup creation failed: Waiter
NodegroupActive failed")
+ mock_waiter.side_effect = waiter_error
+
+ operator = EksCreateNodegroupOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ nodegroup_name=NODEGROUP_NAME,
+ nodegroup_subnets=SUBNET_IDS,
+ nodegroup_role_arn=NODEROLE_ARN[1],
+ wait_for_completion=True,
+ delete_nodegroup_on_failure=True,
+ )
+
+ with pytest.raises(AirflowException):
+ operator.execute({})
+
+ # Nodegroup creation happened.
+ mock_create_nodegroup.assert_called_once()
+
+ # Cleanup attempted.
+ mock_delete_nodegroup.assert_called_once_with(
+ clusterName=CLUSTER_NAME,
+ nodegroupName=NODEGROUP_NAME,
+ )
+
+ @mock.patch.object(EksHook, "delete_nodegroup")
+ @mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
+ @mock.patch.object(EksHook, "create_nodegroup")
+ def test_nodegroup_cleanup_failure_does_not_mask_original_error(
+ self,
+ mock_create_nodegroup,
+ mock_waiter,
+ mock_delete_nodegroup,
+ ):
+ # Airflow currently wraps waiter errors with AirflowException, but the
code intentionally supports both.
+ waiter_error = AirflowException("Nodegroup creation failed: Waiter
NodegroupActive failed")
+
+ cleanup_error = ClientError(
+ error_response={
+ "Error": {
+ "Code": "UnauthorizedOperation",
+ "Message": "You are not authorized to perform this
operation",
+ }
+ },
+ operation_name="DeleteNodegroup",
+ )
+
+ mock_waiter.side_effect = waiter_error
+ mock_delete_nodegroup.side_effect = cleanup_error
+
+ operator = EksCreateNodegroupOperator(
+ task_id=TASK_ID,
+ cluster_name=CLUSTER_NAME,
+ nodegroup_name=NODEGROUP_NAME,
+ nodegroup_subnets=SUBNET_IDS,
+ nodegroup_role_arn=NODEROLE_ARN[1],
+ wait_for_completion=True,
+ delete_nodegroup_on_failure=True,
+ )
+
+ with pytest.raises(AirflowException) as exc:
+ operator.execute({})
+
+ # Original error preserved.
+ assert isinstance(exc.value, AirflowException)
+ assert "Nodegroup creation failed" in str(exc.value)
+
+ # Nodegroup creation happened.
+ mock_create_nodegroup.assert_called_once()
+
+ # Cleanup attempted.
+ mock_delete_nodegroup.assert_called_once_with(
+ clusterName=CLUSTER_NAME,
+ nodegroupName=NODEGROUP_NAME,
+ )
+
class TestEksDeleteClusterOperator:
def setup_method(self) -> None: