This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ca21bfbad9 Add best-effort cleanup to EksCreateNodegroupOperator on 
post-create failure (#61145)
6ca21bfbad9 is described below

commit 6ca21bfbad9f9b5c6b769c706f7b13af55801f4e
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue Feb 10 14:58:01 2026 +0000

    Add best-effort cleanup to EksCreateNodegroupOperator on post-create 
failure (#61145)
---
 .../airflow/providers/amazon/aws/operators/eks.py  | 65 +++++++++++++---
 .../tests/unit/amazon/aws/operators/test_eks.py    | 89 +++++++++++++++++++++-
 2 files changed, 141 insertions(+), 13 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
index a35a206cadc..2b6ae60dc18 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py
@@ -89,6 +89,7 @@ def _create_compute(
     fargate_selectors: list | None = None,
     create_fargate_profile_kwargs: dict | None = None,
     subnets: list[str] | None = None,
+    delete_nodegroup_on_failure: bool = True,
 ):
     log = logging.getLogger(__name__)
     eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region)
@@ -104,17 +105,49 @@ def _create_compute(
             nodeRole=nodegroup_role_arn,
             **create_nodegroup_kwargs,
         )
-        if wait_for_completion:
-            log.info("Waiting for nodegroup to provision.  This will take some 
time.")
-            wait(
-                waiter=eks_hook.conn.get_waiter("nodegroup_active"),
-                waiter_delay=waiter_delay,
-                waiter_max_attempts=waiter_max_attempts,
-                args={"clusterName": cluster_name, "nodegroupName": 
nodegroup_name},
-                failure_message="Nodegroup creation failed",
-                status_message="Nodegroup status is",
-                status_args=["nodegroup.status"],
+        try:
+            if wait_for_completion:
+                log.info("Waiting for nodegroup to provision. This will take 
some time.")
+                wait(
+                    waiter=eks_hook.conn.get_waiter("nodegroup_active"),
+                    waiter_delay=waiter_delay,
+                    waiter_max_attempts=waiter_max_attempts,
+                    args={"clusterName": cluster_name, "nodegroupName": 
nodegroup_name},
+                    failure_message="Nodegroup creation failed",
+                    status_message="Nodegroup status is",
+                    status_args=["nodegroup.status"],
+                )
+
+        # waiter_with_logging.wait wraps botocore WaiterError in 
AirflowException.
+        # WaiterError is caught to handle changes in the implementation of 
waiter_with_logging.wait.
+        except (WaiterError, AirflowException):
+            # Best-effort cleanup when post-initiation steps fail (e.g. 
IAM/permission errors).
+            log.exception(
+                "Nodegroup '%s' in cluster '%s' failed after creation during 
wait phase",
+                nodegroup_name,
+                cluster_name,
             )
+
+            # delete_nodegroup_on_failure defaults to True to prevent orphaned 
nodegroups.
+            if delete_nodegroup_on_failure:
+                try:
+                    eks_hook.delete_nodegroup(
+                        clusterName=cluster_name,
+                        nodegroupName=nodegroup_name,
+                    )
+                    log.info(
+                        "Issued delete request for nodegroup '%s' in cluster 
'%s' after failure.",
+                        nodegroup_name,
+                        cluster_name,
+                    )
+                except ClientError:
+                    log.exception(
+                        "Failed to cleanup nodegroup '%s' in cluster '%s' 
after failure; "
+                        "manual cleanup may be required.",
+                        nodegroup_name,
+                        cluster_name,
+                    )
+            raise
     elif compute == "fargate" and fargate_profile_name:
         # this is to satisfy mypy
         create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
@@ -199,7 +232,8 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
     :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
         This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
         (default: False)
-
+    :param delete_nodegroup_on_failure: Whether to attempt best-effort 
deletion of the managed nodegroup if creation
+        fails during the wait phase after successful initiation. Defaults to 
True.
     """
 
     aws_hook_class = EksHook
@@ -238,6 +272,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         waiter_delay: int = 30,
         waiter_max_attempts: int = 40,
+        delete_nodegroup_on_failure: bool = True,
         **kwargs,
     ) -> None:
         self.compute = compute
@@ -258,6 +293,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
         self.fargate_selectors = fargate_selectors or [{"namespace": 
DEFAULT_NAMESPACE_NAME}]
         self.fargate_profile_name = fargate_profile_name
         self.deferrable = deferrable
+        self.delete_nodegroup_on_failure = delete_nodegroup_on_failure
 
         if region is not None:
             warnings.warn(
@@ -379,6 +415,7 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
                 fargate_selectors=self.fargate_selectors,
                 
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
                 subnets=cast("list[str]", 
self.resources_vpc_config.get("subnetIds")),
+                delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
             )
             if self.compute == "fargate":
                 self.defer(
@@ -454,7 +491,8 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
     :param deferrable: If True, the operator will wait asynchronously for the 
nodegroup to be created.
         This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
         (default: False)
-
+    :param delete_nodegroup_on_failure: Whether to attempt best-effort 
deletion of the managed nodegroup if creation
+        fails during the wait phase after successful initiation. Defaults to 
True.
     """
 
     aws_hook_class = EksHook
@@ -479,6 +517,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
         waiter_delay: int = 30,
         waiter_max_attempts: int = 80,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        delete_nodegroup_on_failure: bool = True,
         **kwargs,
     ) -> None:
         self.nodegroup_subnets = nodegroup_subnets
@@ -493,6 +532,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
         self.waiter_delay = waiter_delay
         self.waiter_max_attempts = waiter_max_attempts
         self.deferrable = deferrable
+        self.delete_nodegroup_on_failure = delete_nodegroup_on_failure
 
         if region is not None:
             warnings.warn(
@@ -531,6 +571,7 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
             nodegroup_role_arn=self.nodegroup_role_arn,
             create_nodegroup_kwargs=self.create_nodegroup_kwargs,
             subnets=self.nodegroup_subnets,
+            delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
         )
 
         if self.deferrable:
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
index d997483a60d..3cf453cb5e0 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
@@ -21,9 +21,10 @@ from typing import Any, TypedDict
 from unittest import mock
 
 import pytest
+from botocore.exceptions import ClientError
 from botocore.waiter import Waiter
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook
 from airflow.providers.amazon.aws.operators.eks import (
     EksCreateClusterOperator,
@@ -586,6 +587,92 @@ class TestEksCreateNodegroupOperator:
             )
         assert m.operator.region_name == "us-east-2"
 
+    @mock.patch.object(EksHook, "delete_nodegroup")
+    @mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
+    @mock.patch.object(EksHook, "create_nodegroup")
+    def test_nodegroup_cleanup_on_waiter_auth_failure(
+        self,
+        mock_create_nodegroup,
+        mock_waiter,
+        mock_delete_nodegroup,
+    ):
+        # Airflow currently wraps waiter errors with AirflowException, but the 
code intentionally supports both.
+        waiter_error = AirflowException("Nodegroup creation failed: Waiter 
NodegroupActive failed")
+        mock_waiter.side_effect = waiter_error
+
+        operator = EksCreateNodegroupOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            nodegroup_name=NODEGROUP_NAME,
+            nodegroup_subnets=SUBNET_IDS,
+            nodegroup_role_arn=NODEROLE_ARN[1],
+            wait_for_completion=True,
+            delete_nodegroup_on_failure=True,
+        )
+
+        with pytest.raises(AirflowException):
+            operator.execute({})
+
+        # Nodegroup creation happened.
+        mock_create_nodegroup.assert_called_once()
+
+        # Cleanup attempted.
+        mock_delete_nodegroup.assert_called_once_with(
+            clusterName=CLUSTER_NAME,
+            nodegroupName=NODEGROUP_NAME,
+        )
+
+    @mock.patch.object(EksHook, "delete_nodegroup")
+    @mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
+    @mock.patch.object(EksHook, "create_nodegroup")
+    def test_nodegroup_cleanup_failure_does_not_mask_original_error(
+        self,
+        mock_create_nodegroup,
+        mock_waiter,
+        mock_delete_nodegroup,
+    ):
+        # Airflow currently wraps waiter errors with AirflowException, but the 
code intentionally supports both.
+        waiter_error = AirflowException("Nodegroup creation failed: Waiter 
NodegroupActive failed")
+
+        cleanup_error = ClientError(
+            error_response={
+                "Error": {
+                    "Code": "UnauthorizedOperation",
+                    "Message": "You are not authorized to perform this 
operation",
+                }
+            },
+            operation_name="DeleteNodegroup",
+        )
+
+        mock_waiter.side_effect = waiter_error
+        mock_delete_nodegroup.side_effect = cleanup_error
+
+        operator = EksCreateNodegroupOperator(
+            task_id=TASK_ID,
+            cluster_name=CLUSTER_NAME,
+            nodegroup_name=NODEGROUP_NAME,
+            nodegroup_subnets=SUBNET_IDS,
+            nodegroup_role_arn=NODEROLE_ARN[1],
+            wait_for_completion=True,
+            delete_nodegroup_on_failure=True,
+        )
+
+        with pytest.raises(AirflowException) as exc:
+            operator.execute({})
+
+        # Original error preserved.
+        assert isinstance(exc.value, AirflowException)
+        assert "Nodegroup creation failed" in str(exc.value)
+
+        # Nodegroup creation happened.
+        mock_create_nodegroup.assert_called_once()
+
+        # Cleanup attempted.
+        mock_delete_nodegroup.assert_called_once_with(
+            clusterName=CLUSTER_NAME,
+            nodegroupName=NODEGROUP_NAME,
+        )
+
 
 class TestEksDeleteClusterOperator:
     def setup_method(self) -> None:

Reply via email to