This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1491ee166a1 Restrict EC2 cleanup to waiter errors and guard behind
flag. (#61272)
1491ee166a1 is described below
commit 1491ee166a140d5e6e229930cfd094faf67c9e4e
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu Feb 5 15:20:44 2026 +0000
Restrict EC2 cleanup to waiter errors and guard behind flag. (#61272)
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../airflow/providers/amazon/aws/operators/ec2.py | 27 ++++++++++++++++------
.../tests/unit/amazon/aws/operators/test_ec2.py | 2 ++
2 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
index acacaaf779f..f5364940248 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/ec2.py
@@ -20,6 +20,8 @@ from __future__ import annotations
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
+from botocore.exceptions import WaiterError
+
from airflow.providers.amazon.aws.hooks.ec2 import EC2Hook
from airflow.providers.amazon.aws.links.ec2 import (
EC2InstanceDashboardLink,
@@ -172,6 +174,8 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
:param config: Dictionary for arbitrary parameters to the boto3
run_instances call.
:param wait_for_completion: If True, the operator will wait for the
instance to be
in the `running` state before returning.
+ :param terminate_instance_on_failure: If True, attempt to terminate the
EC2 instance if the
+ Airflow task fails after the instance has been created. Defaults to
True.
"""
aws_hook_class = EC2Hook
@@ -196,6 +200,7 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
max_attempts: int = 20,
config: dict | None = None,
wait_for_completion: bool = False,
+ terminate_instance_on_failure: bool = True,
**kwargs,
):
super().__init__(**kwargs)
@@ -206,6 +211,7 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
self.max_attempts = max_attempts
self.config = config or {}
self.wait_for_completion = wait_for_completion
+ self.terminate_instance_on_failure = terminate_instance_on_failure
@property
def _hook_parameters(self) -> dict[str, Any]:
@@ -245,18 +251,25 @@ class EC2CreateInstanceOperator(AwsBaseOperator[EC2Hook]):
return instance_ids
# Best-effort cleanup when post-creation steps fail (e.g.
IAM/permission errors).
- except Exception:
+ except WaiterError:
self.log.exception(
- "Exception after EC2 instance creation; attempting cleanup for
instances %s",
+ "Exception after creation of EC2 instances: %s.",
instance_ids,
)
- try:
- self.hook.terminate_instances(instance_ids=instance_ids)
- except Exception:
- self.log.exception(
- "Failed to cleanup EC2 instances %s after task failure",
+ # terminate_instance_on_failure defaults to True to prevent
orphaned EC2 instances.
+ if self.terminate_instance_on_failure:
+ self.log.info(
+ "Attempting termination of instances: %s.",
instance_ids,
)
+
+ try:
+ self.hook.terminate_instances(instance_ids=instance_ids)
+ except Exception:
+ self.log.exception(
+ "Failed to terminate EC2 instances: %s after task
failure.",
+ instance_ids,
+ )
raise
def on_kill(self) -> None:
diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
index 89eae7cdd0f..8faa74c565a 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_ec2.py
@@ -107,6 +107,7 @@ class TestEC2CreateInstanceOperator(BaseEc2TestClass):
task_id="test_cleanup_on_error",
image_id=self._get_image_id(ec2_hook),
wait_for_completion=True,
+ terminate_instance_on_failure=True,
)
waiter_error = WaiterError(
@@ -140,6 +141,7 @@ class TestEC2CreateInstanceOperator(BaseEc2TestClass):
task_id="test_cleanup_failure_does_not_mask_error",
image_id=self._get_image_id(ec2_hook),
wait_for_completion=True,
+ terminate_instance_on_failure=True,
)
waiter_error = WaiterError(