This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d18c01a7a2 EC2 `CreateInstance`: terminate instances in on_kill 
(#36828)
d18c01a7a2 is described below

commit d18c01a7a2f04449262606d79b4f663bdb200414
Author: Achim Gädke <[email protected]>
AuthorDate: Fri Jan 19 08:26:36 2024 +1300

    EC2 `CreateInstance`: terminate instances in on_kill (#36828)
---
 airflow/providers/amazon/aws/operators/ec2.py | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/ec2.py 
b/airflow/providers/amazon/aws/operators/ec2.py
index 2dbb6986d7..797ba2b43e 100644
--- a/airflow/providers/amazon/aws/operators/ec2.py
+++ b/airflow/providers/amazon/aws/operators/ec2.py
@@ -183,22 +183,36 @@ class EC2CreateInstanceOperator(BaseOperator):
             MaxCount=self.max_count,
             **self.config,
         )["Instances"]
-        instance_ids = []
-        for instance in instances:
-            instance_ids.append(instance["InstanceId"])
-            self.log.info("Created EC2 instance %s", instance["InstanceId"])
+
+        instance_ids = self._on_kill_instance_ids = [instance["InstanceId"] 
for instance in instances]
+        for instance_id in instance_ids:
+            self.log.info("Created EC2 instance %s", instance_id)
 
             if self.wait_for_completion:
                 ec2_hook.get_waiter("instance_running").wait(
-                    InstanceIds=[instance["InstanceId"]],
+                    InstanceIds=[instance_id],
                     WaiterConfig={
                         "Delay": self.poll_interval,
                         "MaxAttempts": self.max_attempts,
                     },
                 )
 
+        # leave "_on_kill_instance_ids" in place for finishing post-processing
         return instance_ids
 
+    def on_kill(self) -> None:
+        instance_ids = getattr(self, "_on_kill_instance_ids", [])
+
+        if instance_ids:
+            self.log.info("on_kill: Terminating instance/s %s", ", 
".join(instance_ids))
+            ec2_hook = EC2Hook(
+                aws_conn_id=self.aws_conn_id,
+                region_name=self.region_name,
+                api_type="client_type",
+            )
+            ec2_hook.conn.terminate_instances(InstanceIds=instance_ids)
+        super().on_kill()
+
 
 class EC2TerminateInstanceOperator(BaseOperator):
     """

Reply via email to