This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d18c01a7a2 EC2 `CreateInstance`: terminate instances in on_kill
(#36828)
d18c01a7a2 is described below
commit d18c01a7a2f04449262606d79b4f663bdb200414
Author: Achim Gädke <[email protected]>
AuthorDate: Fri Jan 19 08:26:36 2024 +1300
EC2 `CreateInstance`: terminate instances in on_kill (#36828)
---
airflow/providers/amazon/aws/operators/ec2.py | 24 +++++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/ec2.py
b/airflow/providers/amazon/aws/operators/ec2.py
index 2dbb6986d7..797ba2b43e 100644
--- a/airflow/providers/amazon/aws/operators/ec2.py
+++ b/airflow/providers/amazon/aws/operators/ec2.py
@@ -183,22 +183,36 @@ class EC2CreateInstanceOperator(BaseOperator):
MaxCount=self.max_count,
**self.config,
)["Instances"]
- instance_ids = []
- for instance in instances:
- instance_ids.append(instance["InstanceId"])
- self.log.info("Created EC2 instance %s", instance["InstanceId"])
+
+ instance_ids = self._on_kill_instance_ids = [instance["InstanceId"]
for instance in instances]
+ for instance_id in instance_ids:
+ self.log.info("Created EC2 instance %s", instance_id)
if self.wait_for_completion:
ec2_hook.get_waiter("instance_running").wait(
- InstanceIds=[instance["InstanceId"]],
+ InstanceIds=[instance_id],
WaiterConfig={
"Delay": self.poll_interval,
"MaxAttempts": self.max_attempts,
},
)
+ # leave "_on_kill_instance_ids" in place for finishing post-processing
return instance_ids
+ def on_kill(self) -> None:
+ instance_ids = getattr(self, "_on_kill_instance_ids", [])
+
+ if instance_ids:
+ self.log.info("on_kill: Terminating instance/s %s", ",
".join(instance_ids))
+ ec2_hook = EC2Hook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region_name,
+ api_type="client_type",
+ )
+ ec2_hook.conn.terminate_instances(InstanceIds=instance_ids)
+ super().on_kill()
+
class EC2TerminateInstanceOperator(BaseOperator):
"""