This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 113018ec99 Deferrable mode for EKS Create/Delete Operator (#32355)
113018ec99 is described below

commit 113018ec99e5d414ac0abf5bc29431bfa8f070bb
Author: Syed Hussain <[email protected]>
AuthorDate: Mon Jul 17 14:57:25 2023 -0700

    Deferrable mode for EKS Create/Delete Operator (#32355)
---
 airflow/providers/amazon/aws/operators/eks.py      | 147 ++++++++++++++++-
 airflow/providers/amazon/aws/triggers/eks.py       | 173 ++++++++++++++++++++-
 .../operators/eks.rst                              |   2 +
 tests/providers/amazon/aws/operators/test_eks.py   |  33 ++++
 tests/providers/amazon/aws/triggers/test_eks.py    |  17 ++
 .../aws/example_eks_with_fargate_in_one_step.py    |   3 +
 6 files changed, 367 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/eks.py 
b/airflow/providers/amazon/aws/operators/eks.py
index 6858f80112..e7931cc07a 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -21,7 +21,7 @@ import logging
 import warnings
 from ast import literal_eval
 from datetime import timedelta
-from typing import TYPE_CHECKING, List, Sequence, cast
+from typing import TYPE_CHECKING, Any, List, Sequence, cast
 
 from botocore.exceptions import ClientError, WaiterError
 
@@ -30,8 +30,10 @@ from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarni
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.eks import EksHook
 from airflow.providers.amazon.aws.triggers.eks import (
+    EksCreateClusterTrigger,
     EksCreateFargateProfileTrigger,
     EksCreateNodegroupTrigger,
+    EksDeleteClusterTrigger,
     EksDeleteFargateProfileTrigger,
     EksDeleteNodegroupTrigger,
 )
@@ -187,6 +189,9 @@ class EksCreateClusterOperator(BaseOperator):
          (templated)
     :param waiter_delay: Time (in seconds) to wait between two consecutive 
calls to check cluster state
     :param waiter_max_attempts: The maximum number of attempts to check 
cluster state
+    :param deferrable: If True, the operator will wait asynchronously for the 
job to complete.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
 
     """
 
@@ -225,6 +230,7 @@ class EksCreateClusterOperator(BaseOperator):
         wait_for_completion: bool = False,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         waiter_delay: int = 30,
         waiter_max_attempts: int = 40,
         **kwargs,
@@ -237,7 +243,7 @@ class EksCreateClusterOperator(BaseOperator):
         self.nodegroup_role_arn = nodegroup_role_arn
         self.fargate_pod_execution_role_arn = fargate_pod_execution_role_arn
         self.create_fargate_profile_kwargs = create_fargate_profile_kwargs or 
{}
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion
         self.waiter_delay = waiter_delay
         self.waiter_max_attempts = waiter_max_attempts
         self.aws_conn_id = aws_conn_id
@@ -246,6 +252,7 @@ class EksCreateClusterOperator(BaseOperator):
         self.create_nodegroup_kwargs = create_nodegroup_kwargs or {}
         self.fargate_selectors = fargate_selectors or [{"namespace": 
DEFAULT_NAMESPACE_NAME}]
         self.fargate_profile_name = fargate_profile_name
+        self.deferrable = deferrable
         super().__init__(
             **kwargs,
         )
@@ -274,12 +281,25 @@ class EksCreateClusterOperator(BaseOperator):
 
         # Short circuit early if we don't need to wait to attach compute
         # and the caller hasn't requested to wait for the cluster either.
-        if not self.compute and not self.wait_for_completion:
+        if not any([self.compute, self.wait_for_completion, self.deferrable]):
             return None
 
-        self.log.info("Waiting for EKS Cluster to provision.  This will take 
some time.")
+        self.log.info("Waiting for EKS Cluster to provision. This will take 
some time.")
         client = self.eks_hook.conn
 
+        if self.deferrable:
+            self.defer(
+                trigger=EksCreateClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    aws_conn_id=self.aws_conn_id,
+                    region_name=self.region,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                ),
+                method_name="deferrable_create_cluster_next",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+
         try:
             client.get_waiter("cluster_active").wait(
                 name=self.cluster_name,
@@ -311,6 +331,89 @@ class EksCreateClusterOperator(BaseOperator):
             subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
         )
 
+    def deferrable_create_cluster_next(self, context: Context, event: 
dict[str, Any] | None = None) -> None:
+        if event is None:
+            self.log.error("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "failed":
+            self.log.error("Cluster failed to start and will be torn down.")
+            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region_name=self.region,
+                    force_delete_compute=False,
+                ),
+                method_name="execute_failed",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+        elif event["status"] == "success":
+            self.log.info("Cluster is ready to provision compute.")
+            _create_compute(
+                compute=self.compute,
+                cluster_name=self.cluster_name,
+                aws_conn_id=self.aws_conn_id,
+                region=self.region,
+                wait_for_completion=self.wait_for_completion,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                nodegroup_name=self.nodegroup_name,
+                nodegroup_role_arn=self.nodegroup_role_arn,
+                create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+                fargate_profile_name=self.fargate_profile_name,
+                
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+                fargate_selectors=self.fargate_selectors,
+                
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+                subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
+            )
+            if self.compute == "fargate":
+                self.defer(
+                    trigger=EksCreateFargateProfileTrigger(
+                        cluster_name=self.cluster_name,
+                        fargate_profile_name=self.fargate_profile_name,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+            else:
+                self.defer(
+                    trigger=EksCreateNodegroupTrigger(
+                        nodegroup_name=self.nodegroup_name,
+                        cluster_name=self.cluster_name,
+                        aws_conn_id=self.aws_conn_id,
+                        region_name=self.region,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+
+    def execute_failed(self, context: Context, event: dict[str, Any] | None = 
None) -> None:
+        if event is None:
+            self.log.info("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] == "delteted":
+            self.log.info("Cluster deleted")
+            raise event["exception"]
+
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        resource = "fargate profile" if self.compute == "fargate" else 
self.compute
+        if event is None:
+            self.log.info("Trigger error: event is None")
+            raise AirflowException("Trigger error: event is None")
+        elif event["status"] != "success":
+            raise AirflowException(f"Error creating {resource}: {event}")
+
+        self.log.info("%s created successfully", resource)
+
 
 class EksCreateNodegroupOperator(BaseOperator):
     """
@@ -564,6 +667,11 @@ class EksDeleteClusterOperator(BaseOperator):
          maintained on each worker node).
     :param region: Which AWS region the connection should use. (templated)
         If this is None or empty then the default boto3 behaviour is used.
+    :param waiter_delay: Time (in seconds) to wait between two consecutive 
calls to check cluster state
+    :param waiter_max_attempts: The maximum number of attempts to check 
cluster state
+    :param deferrable: If True, the operator will wait asynchronously for the 
cluster to be deleted.
+        This implies waiting for completion. This mode requires aiobotocore 
module to be installed.
+        (default: False)
 
     """
 
@@ -582,13 +690,19 @@ class EksDeleteClusterOperator(BaseOperator):
         wait_for_completion: bool = False,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 40,
         **kwargs,
     ) -> None:
         self.cluster_name = cluster_name
         self.force_delete_compute = force_delete_compute
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion
         self.aws_conn_id = aws_conn_id
         self.region = region
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
         super().__init__(**kwargs)
 
     def execute(self, context: Context):
@@ -596,8 +710,20 @@ class EksDeleteClusterOperator(BaseOperator):
             aws_conn_id=self.aws_conn_id,
             region_name=self.region,
         )
-
-        if self.force_delete_compute:
+        if self.deferrable:
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region_name=self.region,
+                    force_delete_compute=self.force_delete_compute,
+                ),
+                method_name="execute_complete",
+                timeout=timedelta(seconds=self.waiter_delay * 
self.waiter_max_attempts),
+            )
+        elif self.force_delete_compute:
             self.delete_any_nodegroups(eks_hook)
             self.delete_any_fargate_profiles(eks_hook)
 
@@ -645,6 +771,13 @@ class EksDeleteClusterOperator(BaseOperator):
                 )
         self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME))
 
+    def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
+        if event is None:
+            self.log.error("Trigger error. Event is None")
+            raise AirflowException("Trigger error. Event is None")
+        elif event["status"] == "success":
+            self.log.info("Cluster deleted successfully.")
+
 
 class EksDeleteNodegroupOperator(BaseOperator):
     """
diff --git a/airflow/providers/amazon/aws/triggers/eks.py 
b/airflow/providers/amazon/aws/triggers/eks.py
index a6fb75eb80..ff99b51200 100644
--- a/airflow/providers/amazon/aws/triggers/eks.py
+++ b/airflow/providers/amazon/aws/triggers/eks.py
@@ -17,11 +17,178 @@
 from __future__ import annotations
 
 import warnings
+from typing import Any
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
 from airflow.providers.amazon.aws.hooks.eks import EksHook
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
+from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait
+from airflow.triggers.base import TriggerEvent
+
+
+class EksCreateClusterTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger for EksCreateClusterOperator.
+
+    The trigger will asynchronously wait for the cluster to be created.
+
+    :param cluster_name: The name of the EKS cluster
+    :param waiter_delay: The amount of time in seconds to wait between 
attempts.
+    :param waiter_max_attempts: The maximum number of attempts to be made.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :param region_name: Which AWS region the connection should use.
+         If this is None or empty then the default boto3 behaviour is used.
+    """
+
+    def __init__(
+        self,
+        cluster_name: str,
+        waiter_delay: int,
+        waiter_max_attempts: int,
+        aws_conn_id: str,
+        region_name: str | None,
+    ):
+        super().__init__(
+            serialized_fields={"cluster_name": cluster_name, "region_name": 
region_name},
+            waiter_name="cluster_active",
+            waiter_args={"name": cluster_name},
+            failure_message="Error checking Eks cluster",
+            status_message="Eks cluster status is",
+            status_queries=["cluster.status"],
+            return_value=None,
+            waiter_delay=waiter_delay,
+            waiter_max_attempts=waiter_max_attempts,
+            aws_conn_id=aws_conn_id,
+            region_name=region_name,
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EksHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+
+class EksDeleteClusterTrigger(AwsBaseWaiterTrigger):
+    """
+    Trigger for EksDeleteClusterOperator.
+
+    The trigger will asynchronously wait for the cluster to be deleted. If 
there are
+    any nodegroups or fargate profiles associated with the cluster, they will 
be deleted
+    before the cluster is deleted.
+
+    :param cluster_name: The name of the EKS cluster
+    :param waiter_delay: The amount of time in seconds to wait between 
attempts.
+    :param waiter_max_attempts: The maximum number of attempts to be made.
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    :param region_name: Which AWS region the connection should use.
+         If this is None or empty then the default boto3 behaviour is used.
+    :param force_delete_compute: If True, any nodegroups or fargate profiles 
associated
+        with the cluster will be deleted before the cluster is deleted.
+    """
+
+    def __init__(
+        self,
+        cluster_name,
+        waiter_delay: int,
+        waiter_max_attempts: int,
+        aws_conn_id: str,
+        region_name: str | None,
+        force_delete_compute: bool,
+    ):
+        self.cluster_name = cluster_name
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+        self.force_delete_compute = force_delete_compute
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        return (
+            self.__class__.__module__ + "." + self.__class__.__qualname__,
+            {
+                "cluster_name": self.cluster_name,
+                "waiter_delay": str(self.waiter_delay),
+                "waiter_max_attempts": str(self.waiter_max_attempts),
+                "aws_conn_id": self.aws_conn_id,
+                "region_name": self.region_name,
+                "force_delete_compute": self.force_delete_compute,
+            },
+        )
+
+    def hook(self) -> AwsGenericHook:
+        return EksHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
+
+    async def run(self):
+        async with self.hook.async_conn as client:
+            waiter = client.get_waiter("cluster_deleted")
+            if self.force_delete_compute:
+                await self.delete_any_nodegroups(client=client)
+                await self.delete_any_fargate_profiles(client=client)
+                await client.delete_cluster(name=self.cluster_name)
+            await async_wait(
+                waiter=waiter,
+                waiter_delay=int(self.waiter_delay),
+                waiter_max_attempts=int(self.waiter_max_attempts),
+                args={"name": self.cluster_name},
+                failure_message="Error deleting cluster",
+                status_message="Status of cluster is",
+                status_args=["cluster.status"],
+            )
+
+        yield TriggerEvent({"status": "deleted"})
+
+    async def delete_any_nodegroups(self, client) -> None:
+        """
+        Deletes all EKS Nodegroups for a provided Amazon EKS Cluster.
+
+        All the EKS Nodegroups are deleted simultaneously. We wait for
+        all Nodegroups to be deleted before returning.
+        """
+        nodegroups = await 
client.list_nodegroups(clusterName=self.cluster_name)
+        if nodegroups.get("nodegroups", None):
+            self.log.info("Deleting nodegroups")
+            # ignoring attr-defined here because aws_base hook defines 
get_waiter for all hooks
+            waiter = self.hook.get_waiter(  # type: ignore[attr-defined]
+                "all_nodegroups_deleted", deferrable=True, client=client
+            )
+            for group in nodegroups["nodegroups"]:
+                await client.delete_nodegroup(clusterName=self.cluster_name, 
nodegroupName=group)
+            await async_wait(
+                waiter=waiter,
+                waiter_delay=int(self.waiter_delay),
+                waiter_max_attempts=int(self.waiter_max_attempts),
+                args={"clusterName": self.cluster_name},
+                failure_message=f"Error deleting nodegroup for cluster 
{self.cluster_name}",
+                status_message="Deleting nodegroups associated with the 
cluster",
+                status_args=["nodegroups"],
+            )
+            self.log.info("All nodegroups deleted")
+        else:
+            self.log.info("No nodegroups associated with cluster %s", 
self.cluster_name)
+
+    async def delete_any_fargate_profiles(self, client) -> None:
+        """
+        Deletes all EKS Fargate profiles for a provided Amazon EKS Cluster.
+
+        EKS Fargate profiles must be deleted one at a time, so we must wait
+        for one to be deleted before sending the next delete command.
+        """
+        fargate_profiles = await 
client.list_fargate_profiles(clusterName=self.cluster_name)
+        if fargate_profiles.get("fargateProfileNames"):
+            self.log.info("Waiting for Fargate profiles to delete.  This will 
take some time.")
+            for profile in fargate_profiles["fargateProfileNames"]:
+                await 
client.delete_fargate_profile(clusterName=self.cluster_name, 
fargateProfileName=profile)
+                await async_wait(
+                    waiter=client.get_waiter("fargate_profile_deleted"),
+                    waiter_delay=int(self.waiter_delay),
+                    waiter_max_attempts=int(self.waiter_max_attempts),
+                    args={"clusterName": self.cluster_name, 
"fargateProfileName": profile},
+                    failure_message=f"Error deleting fargate profile for 
cluster {self.cluster_name}",
+                    status_message="Status of fargate profile is",
+                    status_args=["fargateProfile.status"],
+                )
+            self.log.info("All Fargate profiles deleted")
+        else:
+            self.log.info(f"No Fargate profiles associated with cluster 
{self.cluster_name}")
 
 
 class EksCreateFargateProfileTrigger(AwsBaseWaiterTrigger):
@@ -145,7 +312,11 @@ class EksCreateNodegroupTrigger(AwsBaseWaiterTrigger):
         region_name: str | None,
     ):
         super().__init__(
-            serialized_fields={"cluster_name": cluster_name, "nodegroup_name": 
nodegroup_name},
+            serialized_fields={
+                "cluster_name": cluster_name,
+                "nodegroup_name": nodegroup_name,
+                "region_name": region_name,
+            },
             waiter_name="nodegroup_active",
             waiter_args={"clusterName": cluster_name, "nodegroupName": 
nodegroup_name},
             failure_message="Error creating nodegroup",
diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst 
b/docs/apache-airflow-providers-amazon/operators/eks.rst
index cff682db2a..9f1cc9df61 100644
--- a/docs/apache-airflow-providers-amazon/operators/eks.rst
+++ b/docs/apache-airflow-providers-amazon/operators/eks.rst
@@ -76,6 +76,7 @@ Create an Amazon EKS cluster and AWS Fargate profile in one 
step
 
 To create an Amazon EKS cluster and an AWS Fargate profile in one command, you 
can use
 :class:`~airflow.providers.amazon.aws.operators.eks.EksCreateClusterOperator`.
+You can also run this operator in deferrable mode by setting ``deferrable`` 
param to ``True``.
 
 Note: An AWS IAM role with the following permissions is required:
   ``ec2.amazon.aws.com`` must be in the Trusted Relationships
@@ -97,6 +98,7 @@ Delete an Amazon EKS Cluster
 
 To delete an existing Amazon EKS Cluster you can use
 :class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteClusterOperator`.
+You can also run this operator in deferrable mode by setting ``deferrable`` 
param to ``True``.
 
 .. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
     :language: python
diff --git a/tests/providers/amazon/aws/operators/test_eks.py 
b/tests/providers/amazon/aws/operators/test_eks.py
index 8381a26ef1..e537ab20a3 100644
--- a/tests/providers/amazon/aws/operators/test_eks.py
+++ b/tests/providers/amazon/aws/operators/test_eks.py
@@ -337,6 +337,34 @@ class TestEksCreateClusterOperator:
         ):
             missing_fargate_pod_execution_role_arn.execute({})
 
+    @mock.patch.object(EksHook, "create_cluster")
+    def test_eks_create_cluster_short_circuit_early(self, mock_create_cluster, 
caplog):
+        mock_create_cluster.return_value = None
+        eks_create_cluster_operator = EksCreateClusterOperator(
+            task_id=TASK_ID,
+            **self.create_cluster_params,
+            compute=None,
+            wait_for_completion=False,
+            deferrable=False,
+        )
+        eks_create_cluster_operator.execute({})
+        assert len(caplog.records) == 0
+
+    @mock.patch.object(EksHook, "create_cluster")
+    def test_eks_create_cluster_with_deferrable(self, mock_create_cluster, 
caplog):
+        mock_create_cluster.return_value = None
+
+        eks_create_cluster_operator = EksCreateClusterOperator(
+            task_id=TASK_ID,
+            **self.create_cluster_params,
+            compute=None,
+            wait_for_completion=False,
+            deferrable=True,
+        )
+        with pytest.raises(TaskDeferred):
+            eks_create_cluster_operator.execute({})
+        assert "Waiting for EKS Cluster to provision. This will take some 
time." in caplog.messages
+
 
 class TestEksCreateFargateProfileOperator:
     def setup_method(self) -> None:
@@ -542,6 +570,11 @@ class TestEksDeleteClusterOperator:
         mock_waiter.assert_called_with(mock.ANY, name=CLUSTER_NAME)
         assert_expected_waiter_type(mock_waiter, "ClusterDeleted")
 
+    def test_eks_delete_cluster_operator_with_deferrable(self):
+        self.delete_cluster_operator.deferrable = True
+        with pytest.raises(TaskDeferred):
+            self.delete_cluster_operator.execute({})
+
 
 class TestEksDeleteNodegroupOperator:
     def setup_method(self) -> None:
diff --git a/tests/providers/amazon/aws/triggers/test_eks.py 
b/tests/providers/amazon/aws/triggers/test_eks.py
index 045519aea5..023f8d2a97 100644
--- a/tests/providers/amazon/aws/triggers/test_eks.py
+++ b/tests/providers/amazon/aws/triggers/test_eks.py
@@ -19,8 +19,10 @@ from __future__ import annotations
 import pytest
 
 from airflow.providers.amazon.aws.triggers.eks import (
+    EksCreateClusterTrigger,
     EksCreateFargateProfileTrigger,
     EksCreateNodegroupTrigger,
+    EksDeleteClusterTrigger,
     EksDeleteFargateProfileTrigger,
     EksDeleteNodegroupTrigger,
 )
@@ -68,6 +70,21 @@ class TestEksTriggers:
                 waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
                 region_name=TEST_REGION,
             ),
+            EksCreateClusterTrigger(
+                cluster_name=TEST_CLUSTER_IDENTIFIER,
+                waiter_delay=TEST_WAITER_DELAY,
+                waiter_max_attempts=TEST_WAITER_DELAY,
+                aws_conn_id=TEST_AWS_CONN_ID,
+                region_name=TEST_REGION,
+            ),
+            EksDeleteClusterTrigger(
+                cluster_name=TEST_CLUSTER_IDENTIFIER,
+                waiter_delay=TEST_WAITER_DELAY,
+                waiter_max_attempts=TEST_WAITER_DELAY,
+                aws_conn_id=TEST_AWS_CONN_ID,
+                region_name=TEST_REGION,
+                force_delete_compute=True,
+            ),
         ],
     )
     def test_serialize_recreate(self, trigger):
diff --git 
a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py 
b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
index ae67a26588..6b9907d836 100644
--- a/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
+++ b/tests/system/providers/amazon/aws/example_eks_with_fargate_in_one_step.py
@@ -81,6 +81,9 @@ with DAG(
         # Opting to use the same ARN for the cluster and the pod here,
         # but a different ARN could be configured and passed if desired.
         fargate_pod_execution_role_arn=fargate_pod_role_arn,
+        deferrable=True,
+        waiter_delay=30,
+        wait_for_completion=399,
     )
     # [END howto_operator_eks_create_cluster_with_fargate_profile]
 

Reply via email to