Lee-W commented on code in PR #32355:
URL: https://github.com/apache/airflow/pull/32355#discussion_r1254066799


##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -224,6 +229,7 @@ def __init__(
         wait_for_completion: bool = False,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        deferrable: bool = False,

Review Comment:
   After https://github.com/apache/airflow/pull/31712 is merged, we might want 
to use that feature and change the default value to the following.
   
   ```suggestion
           deferrable: bool = conf.getboolean("operators", 
"default_deferrable", fallback=False),,
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
             subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
         )
 
+    def deferrable_create_cluster_next(self, context, event=None):
+        if event["status"] == "failed":
+            self.log.error("Cluster failed to start and will be torn down.")
+            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region=self.region,
+                    delete_resources=False,
+                ),
+                method_name="execute_failed",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+        elif event["status"] == "success":
+            self.log.info("Cluster is ready to provision compute.")
+            _create_compute(
+                compute=self.compute,
+                cluster_name=self.cluster_name,
+                aws_conn_id=self.aws_conn_id,
+                region=self.region,
+                wait_for_completion=self.wait_for_completion,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                nodegroup_name=self.nodegroup_name,
+                nodegroup_role_arn=self.nodegroup_role_arn,
+                create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+                fargate_profile_name=self.fargate_profile_name,
+                
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+                fargate_selectors=self.fargate_selectors,
+                
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+                subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
+            )
+            if self.compute == "fargate":
+                self.defer(
+                    trigger=EksCreateFargateProfileTrigger(
+                        cluster_name=self.cluster_name,
+                        fargate_profile_name=self.fargate_profile_name,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+            else:
+                self.defer(
+                    trigger=EksNodegroupTrigger(
+                        waiter_name="nodegroup_active",
+                        nodegroup_name=self.nodegroup_name,
+                        cluster_name=self.cluster_name,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+
+    def execute_failed(self, context, event=None):

Review Comment:
   nitpick
   
   ```suggestion
       def execute_failed(self, context: Context, event: dict[str, Any] | 
None=None) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
             subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
         )
 
+    def deferrable_create_cluster_next(self, context, event=None):
+        if event["status"] == "failed":
+            self.log.error("Cluster failed to start and will be torn down.")
+            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region=self.region,
+                    delete_resources=False,
+                ),
+                method_name="execute_failed",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+        elif event["status"] == "success":
+            self.log.info("Cluster is ready to provision compute.")
+            _create_compute(
+                compute=self.compute,
+                cluster_name=self.cluster_name,
+                aws_conn_id=self.aws_conn_id,
+                region=self.region,
+                wait_for_completion=self.wait_for_completion,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                nodegroup_name=self.nodegroup_name,
+                nodegroup_role_arn=self.nodegroup_role_arn,
+                create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+                fargate_profile_name=self.fargate_profile_name,
+                
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+                fargate_selectors=self.fargate_selectors,
+                
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+                subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
+            )
+            if self.compute == "fargate":
+                self.defer(
+                    trigger=EksCreateFargateProfileTrigger(
+                        cluster_name=self.cluster_name,
+                        fargate_profile_name=self.fargate_profile_name,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+            else:
+                self.defer(
+                    trigger=EksNodegroupTrigger(
+                        waiter_name="nodegroup_active",
+                        nodegroup_name=self.nodegroup_name,
+                        cluster_name=self.cluster_name,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+
+    def execute_failed(self, context, event=None):
+        if event["status"] == "delteted":
+            self.log.info("Cluster deleted")
+            raise event["exception"]
+
+    def execute_complete(self, context, event=None):

Review Comment:
   nitpick
   
   ```suggestion
       def execute_failed(self, context: Context, event: dict[str, Any] | 
None=None) -> None:
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
             subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
         )
 
+    def deferrable_create_cluster_next(self, context, event=None):
+        if event["status"] == "failed":
+            self.log.error("Cluster failed to start and will be torn down.")
+            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region=self.region,
+                    delete_resources=False,
+                ),
+                method_name="execute_failed",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+        elif event["status"] == "success":
+            self.log.info("Cluster is ready to provision compute.")
+            _create_compute(
+                compute=self.compute,
+                cluster_name=self.cluster_name,
+                aws_conn_id=self.aws_conn_id,
+                region=self.region,
+                wait_for_completion=self.wait_for_completion,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                nodegroup_name=self.nodegroup_name,
+                nodegroup_role_arn=self.nodegroup_role_arn,
+                create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+                fargate_profile_name=self.fargate_profile_name,
+                
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+                fargate_selectors=self.fargate_selectors,
+                
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+                subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
+            )
+            if self.compute == "fargate":
+                self.defer(
+                    trigger=EksCreateFargateProfileTrigger(
+                        cluster_name=self.cluster_name,
+                        fargate_profile_name=self.fargate_profile_name,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+            else:
+                self.defer(
+                    trigger=EksNodegroupTrigger(
+                        waiter_name="nodegroup_active",
+                        nodegroup_name=self.nodegroup_name,
+                        cluster_name=self.cluster_name,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+
+    def execute_failed(self, context, event=None):
+        if event["status"] == "delteted":
+            self.log.info("Cluster deleted")
+            raise event["exception"]
+
+    def execute_complete(self, context, event=None):
+        resource = "fargate profile" if self.compute == "fargate" else 
self.compute
+        if event["status"] != "success":
+            raise AirflowException(f"Error creating {resource}: {event}")
+        else:
+            self.log.info("%s created successfully", resource)
+        return

Review Comment:
   ```suggestion
           self.log.info("%s created successfully", resource)
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -582,21 +686,39 @@ def __init__(
         wait_for_completion: bool = False,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        deferrable: bool = False,
+        waiter_delay: int = 30,
+        waiter_max_attempts: int = 40,
         **kwargs,
     ) -> None:
         self.cluster_name = cluster_name
         self.force_delete_compute = force_delete_compute
-        self.wait_for_completion = wait_for_completion
+        self.wait_for_completion = False if deferrable else wait_for_completion
         self.aws_conn_id = aws_conn_id
         self.region = region
+        self.deferrable = deferrable
+        self.waiter_delay = waiter_delay
+        self.waiter_max_attempts = waiter_max_attempts
         super().__init__(**kwargs)
 
     def execute(self, context: Context):
         eks_hook = EksHook(
             aws_conn_id=self.aws_conn_id,
             region_name=self.region,
         )
-
+        if self.deferrable:
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region=self.region,
+                    force_delete_compute=self.force_delete_compute,
+                ),
+                method_name="execute_complete",
+                timeout=timedelta(seconds=self.waiter_delay * 
self.waiter_max_attempts),
+            )
         if self.force_delete_compute:
             self.delete_any_nodegroups(eks_hook)
             self.delete_any_fargate_profiles(eks_hook)

Review Comment:
   Should we add an else here for wrapping the rest of the code? 
   ```suggestion
               )
           else:
               if self.force_delete_compute:
                   self.delete_any_nodegroups(eks_hook)
                   self.delete_any_fargate_profiles(eks_hook)
   ```



##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -336,6 +336,34 @@ def 
test_fargate_compute_missing_fargate_pod_execution_role_arn(self):
         ):
             missing_fargate_pod_execution_role_arn.execute({})
 
+    @mock.patch.object(EksHook, "create_cluster")
+    def test_eks_create_cluster_short_circuit_early(self, mock_create_cluster, 
caplog):
+        mock_create_cluster.return_value = None
+        eks_create_cluster_operator = EksCreateClusterOperator(
+            task_id=TASK_ID,
+            **self.create_cluster_params,
+            compute=None,
+            wait_for_completion=False,
+            deferrable=False,
+        )
+        eks_create_cluster_operator.execute({})
+        assert len(caplog.records) == 0
+
+    @mock.patch.object(EksHook, "create_cluster")
+    def test_eks_create_cluster_with_deferrable(self, mock_create_cluster, 
caplog):
+        mock_create_cluster.return_value = None
+
+        eks_create_cluster_operator = EksCreateClusterOperator(
+            task_id=TASK_ID,
+            **self.create_cluster_params,
+            compute=None,
+            wait_for_completion=False,
+            deferrable=True,
+        )
+        with pytest.raises(TaskDeferred):
+            eks_create_cluster_operator.execute({})
+        assert "Waiting for EKS Cluster to provision.  This will take some 
time." in caplog.messages

Review Comment:
   ```suggestion
           assert "Waiting for EKS Cluster to provision. This will take some 
time." in caplog.messages
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
             subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
         )
 
+    def deferrable_create_cluster_next(self, context, event=None):
+        if event["status"] == "failed":
+            self.log.error("Cluster failed to start and will be torn down.")
+            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.defer(
+                trigger=EksDeleteClusterTrigger(
+                    cluster_name=self.cluster_name,
+                    waiter_delay=self.waiter_delay,
+                    waiter_max_attempts=self.waiter_max_attempts,
+                    aws_conn_id=self.aws_conn_id,
+                    region=self.region,
+                    delete_resources=False,
+                ),
+                method_name="execute_failed",
+                timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+            )
+        elif event["status"] == "success":
+            self.log.info("Cluster is ready to provision compute.")
+            _create_compute(
+                compute=self.compute,
+                cluster_name=self.cluster_name,
+                aws_conn_id=self.aws_conn_id,
+                region=self.region,
+                wait_for_completion=self.wait_for_completion,
+                waiter_delay=self.waiter_delay,
+                waiter_max_attempts=self.waiter_max_attempts,
+                nodegroup_name=self.nodegroup_name,
+                nodegroup_role_arn=self.nodegroup_role_arn,
+                create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+                fargate_profile_name=self.fargate_profile_name,
+                
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+                fargate_selectors=self.fargate_selectors,
+                
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+                subnets=cast(List[str], 
self.resources_vpc_config.get("subnetIds")),
+            )
+            if self.compute == "fargate":
+                self.defer(
+                    trigger=EksCreateFargateProfileTrigger(
+                        cluster_name=self.cluster_name,
+                        fargate_profile_name=self.fargate_profile_name,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+            else:
+                self.defer(
+                    trigger=EksNodegroupTrigger(
+                        waiter_name="nodegroup_active",
+                        nodegroup_name=self.nodegroup_name,
+                        cluster_name=self.cluster_name,
+                        aws_conn_id=self.aws_conn_id,
+                        region=self.region,
+                        waiter_delay=self.waiter_delay,
+                        waiter_max_attempts=self.waiter_max_attempts,
+                    ),
+                    method_name="execute_complete",
+                    timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
+                )
+
+    def execute_failed(self, context, event=None):
+        if event["status"] == "delteted":

Review Comment:
   According to 
https://github.com/apache/airflow/blob/c042ee68b3cd94300d4b422b81b78d0910a4c26a/airflow/providers/amazon/aws/triggers/eks.py#L163,
 it seems to be the only case, do we need this check? What happens if the 
status is not "deleted"? 



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -582,21 +686,39 @@ def __init__(
         wait_for_completion: bool = False,
         aws_conn_id: str = DEFAULT_CONN_ID,
         region: str | None = None,
+        deferrable: bool = False,

Review Comment:
   After https://github.com/apache/airflow/pull/31712 is merged, we might want 
to use that feature and change the default value to the following.
   
   ```suggestion
           deferrable: bool = conf.getboolean("operators", 
"default_deferrable", fallback=False),,
   ```



##########
tests/providers/amazon/aws/triggers/test_eks.py:
##########
@@ -453,3 +455,403 @@ async def 
test_eks_nodegroup_trigger_run_attempts_failed(self, mock_async_conn,
 
         assert "Error checking nodegroup" in str(exc.value)
         assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksCreateClusterTrigger:
+    def test_eks_create_cluster_trigger_serialize(self):

Review Comment:
   As we already wrap these test  cases as a class, not sure whether we still 
need the `eks_create_cluster_trigger` prefix
   
   ```suggestion
       def test_serialize(self):
   ```



##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -273,12 +280,28 @@ def execute(self, context: Context):
 
         # Short circuit early if we don't need to wait to attach compute
         # and the caller hasn't requested to wait for the cluster either.
-        if not self.compute and not self.wait_for_completion:
+        # if not self.compute and not self.wait_for_completion and not 
self.deferrable:
+        #     return None
+        if not any([self.compute, self.wait_for_completion, self.deferrable]):
             return None
 
         self.log.info("Waiting for EKS Cluster to provision.  This will take 
some time.")

Review Comment:
   ```suggestion
           self.log.info("Waiting for EKS Cluster to provision. This will take 
some time.")
   ```



##########
tests/providers/amazon/aws/triggers/test_eks.py:
##########
@@ -453,3 +455,403 @@ async def 
test_eks_nodegroup_trigger_run_attempts_failed(self, mock_async_conn,
 
         assert "Error checking nodegroup" in str(exc.value)
         assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksCreateClusterTrigger:
+    def test_eks_create_cluster_trigger_serialize(self):
+        eks_create_cluster_trigger = EksCreateClusterTrigger(
+            waiter_name="test_waiter_name",
+            cluster_name=TEST_CLUSTER_IDENTIFIER,
+            aws_conn_id=TEST_AWS_CONN_ID,
+            waiter_delay=TEST_WAITER_DELAY,
+            waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+            region=TEST_REGION,
+        )
+
+        class_path, args = eks_create_cluster_trigger.serialize()
+        assert class_path == 
"airflow.providers.amazon.aws.triggers.eks.EksCreateClusterTrigger"
+        assert args["waiter_name"] == "test_waiter_name"
+        assert args["cluster_name"] == TEST_CLUSTER_IDENTIFIER
+        assert args["aws_conn_id"] == TEST_AWS_CONN_ID
+        assert args["waiter_delay"] == str(TEST_WAITER_DELAY)
+        assert args["waiter_max_attempts"] == str(TEST_WAITER_MAX_ATTEMPTS)
+        assert args["region"] == TEST_REGION
+
+    @pytest.mark.asyncio
+    @mock.patch.object(EksHook, "async_conn")
+    async def test_eks_create_cluster_trigger_run(self, mock_async_conn):
+        a_mock = mock.MagicMock()
+        mock_async_conn.__aenter__.return_value = a_mock
+
+        a_mock.get_waiter().wait = AsyncMock()
+
+        eks_create_cluster_trigger = EksCreateClusterTrigger(
+            waiter_name="test_waiter_name",
+            cluster_name=TEST_CLUSTER_IDENTIFIER,
+            aws_conn_id=TEST_AWS_CONN_ID,
+            waiter_delay=TEST_WAITER_DELAY,
+            waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+            region=TEST_REGION,
+        )
+
+        generator = eks_create_cluster_trigger.run()
+        response = await generator.asend(None)
+
+        assert response == TriggerEvent(
+            {
+                "status": "success",
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep")
+    @mock.patch.object(EksHook, "async_conn")
+    async def test_eks_create_cluster_trigger_run_multiple_attempts(self, 
mock_async_conn, mock_sleep):
+        mock_sleep.return_value = True
+        a_mock = mock.MagicMock()
+        mock_async_conn.__aenter__.return_value = a_mock
+        error = WaiterError(
+            name="test_name",
+            reason="test_reason",
+            last_response={"cluster": {"status": "CREATING"}},
+        )
+        a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error, 
True])
+
+        eks_create_cluster_trigger = EksCreateClusterTrigger(
+            waiter_name="test_waiter_name",
+            cluster_name=TEST_CLUSTER_IDENTIFIER,
+            aws_conn_id=TEST_AWS_CONN_ID,
+            waiter_delay=TEST_WAITER_DELAY,
+            waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+            region=TEST_REGION,
+        )
+
+        generator = eks_create_cluster_trigger.run()
+        response = await generator.asend(None)
+        assert a_mock.get_waiter().wait.call_count == 4
+        assert response == TriggerEvent(
+            {
+                "status": "success",
+            }
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep")
+    @mock.patch.object(EksHook, "async_conn")
+    async def test_eks_create_cluster_trigger_run_attempts_exceeded(self, 
mock_async_conn, mock_sleep):
+        mock_sleep.return_value = True
+        a_mock = mock.MagicMock()
+        mock_async_conn.__aenter__.return_value = a_mock
+        error = WaiterError(
+            name="test_name",
+            reason="test_reason",
+            last_response={"cluster": {"status": "CREATING"}},
+        )
+        a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error, 
True])
+
+        eks_create_cluster_trigger = EksCreateClusterTrigger(
+            waiter_name="test_waiter_name",
+            cluster_name=TEST_CLUSTER_IDENTIFIER,
+            aws_conn_id=TEST_AWS_CONN_ID,
+            waiter_delay=TEST_WAITER_DELAY,
+            waiter_max_attempts=2,
+            region=TEST_REGION,
+        )
+
+        with pytest.raises(AirflowException) as exc:
+            generator = eks_create_cluster_trigger.run()
+            await generator.asend(None)
+        assert "Waiter error: max attempts reached" in str(exc)
+        assert a_mock.get_waiter().wait.call_count == 2
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep")
+    @mock.patch.object(EksHook, "async_conn")
+    async def test_eks_create_cluster_trigger_run_attempts_failed(self, 
mock_async_conn, mock_sleep):
+        mock_sleep.return_value = True
+        a_mock = mock.MagicMock()
+        mock_async_conn.__aenter__.return_value = a_mock
+        error_creating = WaiterError(
+            name="test_name",
+            reason="test_reason",
+            last_response={"cluster": {"status": "CREATING"}},
+        )
+        error_failed = WaiterError(
+            name="test_name",
+            reason="Waiter encountered a terminal failure state:",
+            last_response={"cluster": {"status": "FAILED"}},
+        )
+        a_mock.get_waiter().wait = AsyncMock(side_effect=[error_creating, 
error_creating, error_failed])
+
+        AirflowException("Error checking Eks cluster")
+
+        eks_create_cluster_trigger = EksCreateClusterTrigger(
+            waiter_name="test_waiter_name",
+            cluster_name=TEST_CLUSTER_IDENTIFIER,
+            aws_conn_id=TEST_AWS_CONN_ID,
+            waiter_delay=TEST_WAITER_DELAY,
+            waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+            region=TEST_REGION,
+        )
+
+        generator = eks_create_cluster_trigger.run()
+        await generator.asend(None)
+
+        # assert response == TriggerEvent({"status": "failed", "exception": 
exc})
+        # from pdb import set_trace; set_trace()

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to