This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 88f164541fe EC2CreateInstanceOperator could leave EC2 instances 
running when failures (#60904)
88f164541fe is described below

commit 88f164541fe01d7247f969356f39029d7b87c3a6
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu Jan 22 17:10:10 2026 +0000

    EC2CreateInstanceOperator could leave EC2 instances running when failures 
(#60904)
    
    occurred after successful instance creation (e.g. waiter failures due to 
missing
    DescribeInstances permissions).
    
    This change adds best-effort cleanup when post-creation steps fail by 
attempting
    to terminate created instances. Cleanup errors are logged but do not mask 
the
    original exception.
    
    Tests cover successful cleanup on failure and ensure cleanup failures do not
    override the original error.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../airflow/providers/amazon/aws/operators/ec2.py  | 63 +++++++++++-------
 .../tests/unit/amazon/aws/operators/test_ec2.py    | 77 ++++++++++++++++++++++
 2 files changed, 116 insertions(+), 24 deletions(-)

diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
index 78202e7bd8f..acacaaf779f 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
@@ -218,31 +218,46 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
             MaxCount=self.max_count,
             **self.config,
         )["Instances"]
-
-        instance_ids = self._on_kill_instance_ids = [instance["InstanceId"] 
for instance in instances]
-        # Console link is for EC2 dashboard list, not individual instances 
when more than 1 instance
-
-        EC2InstanceDashboardLink.persist(
-            context=context,
-            operator=self,
-            region_name=self.hook.conn_region_name,
-            aws_partition=self.hook.conn_partition,
-            
instance_ids=EC2InstanceDashboardLink.format_instance_id_filter(instance_ids),
-        )
-        for instance_id in instance_ids:
-            self.log.info("Created EC2 instance %s", instance_id)
-
-            if self.wait_for_completion:
-                self.hook.get_waiter("instance_running").wait(
-                    InstanceIds=[instance_id],
-                    WaiterConfig={
-                        "Delay": self.poll_interval,
-                        "MaxAttempts": self.max_attempts,
-                    },
+        try:
+            instance_ids = self._on_kill_instance_ids = 
[instance["InstanceId"] for instance in instances]
+            # Console link is for EC2 dashboard list, not individual instances 
when more than 1 instance
+
+            EC2InstanceDashboardLink.persist(
+                context=context,
+                operator=self,
+                region_name=self.hook.conn_region_name,
+                aws_partition=self.hook.conn_partition,
+                
instance_ids=EC2InstanceDashboardLink.format_instance_id_filter(instance_ids),
+            )
+            for instance_id in instance_ids:
+                self.log.info("Created EC2 instance %s", instance_id)
+
+                if self.wait_for_completion:
+                    self.hook.get_waiter("instance_running").wait(
+                        InstanceIds=[instance_id],
+                        WaiterConfig={
+                            "Delay": self.poll_interval,
+                            "MaxAttempts": self.max_attempts,
+                        },
+                    )
+
+            # leave "_on_kill_instance_ids" in place for finishing 
post-processing
+            return instance_ids
+
+        # Best-effort cleanup when post-creation steps fail (e.g. 
IAM/permission errors).
+        except Exception:
+            self.log.exception(
+                "Exception after EC2 instance creation; attempting cleanup for 
instances %s",
+                instance_ids,
+            )
+            try:
+                self.hook.terminate_instances(instance_ids=instance_ids)
+            except Exception:
+                self.log.exception(
+                    "Failed to cleanup EC2 instances %s after task failure",
+                    instance_ids,
                 )
-
-        # leave "_on_kill_instance_ids" in place for finishing post-processing
-        return instance_ids
+            raise
 
     def on_kill(self) -> None:
         instance_ids = getattr(self, "_on_kill_instance_ids", [])
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
index 51d7963e229..89eae7cdd0f 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
@@ -17,7 +17,10 @@
 # under the License.
 from __future__ import annotations
 
+from unittest import mock
+
 import pytest
+from botocore.exceptions import ClientError, WaiterError
 from moto import mock_aws
 
 from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
@@ -96,6 +99,80 @@ class TestEC2CreateInstanceOperator(BaseEc2TestClass):
         )
         validate_template_fields(ec2_operator)
 
+    @mock_aws
+    def test_cleanup_on_post_create_failure(self):
+        ec2_hook = EC2Hook()
+
+        operator = EC2CreateInstanceOperator(
+            task_id="test_cleanup_on_error",
+            image_id=self._get_image_id(ec2_hook),
+            wait_for_completion=True,
+        )
+
+        waiter_error = WaiterError(
+            "InstanceRunning",
+            "You are not authorized to perform this operation",
+            {},
+        )
+
+        # Force failure after instance creation (e.g. missing 
DescribeInstances permission).
+        with mock.patch.object(operator.hook, "get_waiter") as mock_get_waiter:
+            mock_get_waiter.return_value.wait.side_effect = waiter_error
+            with pytest.raises(WaiterError) as exc:
+                operator.execute(None)
+
+        # Ensure the original waiter exception is propagated unchanged.
+        assert exc.value is waiter_error
+
+        # Instance must have been terminated.
+        # We know exactly one instance was created.
+        instances = list(ec2_hook.conn.instances.all())
+        assert len(instances) == 1
+
+        instance = instances[0]
+        assert instance.state["Name"] == "terminated"
+
+    @mock_aws
+    def test_cleanup_failure_propagates_original_exception(self):
+        ec2_hook = EC2Hook()
+
+        operator = EC2CreateInstanceOperator(
+            task_id="test_cleanup_failure_does_not_mask_error",
+            image_id=self._get_image_id(ec2_hook),
+            wait_for_completion=True,
+        )
+
+        waiter_error = WaiterError(
+            "InstanceRunning",
+            "You are not authorized to perform this operation",
+            {},
+        )
+
+        cleanup_error = ClientError(
+            error_response={
+                "Error": {
+                    "Code": "UnauthorizedOperation",
+                    "Message": "You are not authorized to perform this 
operation",
+                }
+            },
+            operation_name="TerminateInstances",
+        )
+
+        with (
+            mock.patch.object(operator.hook, "get_waiter") as mock_get_waiter,
+            mock.patch.object(operator.hook, "terminate_instances") as 
mock_terminate,
+        ):
+            mock_get_waiter.return_value.wait.side_effect = waiter_error
+            mock_terminate.side_effect = cleanup_error
+
+            with pytest.raises(WaiterError) as exc:
+                operator.execute(None)
+
+            # Ensure the original waiter exception is propagated unchanged.
+            assert exc.value is waiter_error
+
+        # Cleanup is best-effort; failure to terminate must not override the 
original error.
+
 
 class TestEC2TerminateInstanceOperator(BaseEc2TestClass):
     def test_init(self):

Reply via email to