Lee-W commented on code in PR #32355:
URL: https://github.com/apache/airflow/pull/32355#discussion_r1254066799
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -224,6 +229,7 @@ def __init__(
wait_for_completion: bool = False,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ deferrable: bool = False,
Review Comment:
After https://github.com/apache/airflow/pull/31712 is merged, we might want
to use that feature and change the default value to the following.
```suggestion
deferrable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False),,
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
)
+ def deferrable_create_cluster_next(self, context, event=None):
+ if event["status"] == "failed":
+ self.log.error("Cluster failed to start and will be torn down.")
+ self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ delete_resources=False,
+ ),
+ method_name="execute_failed",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ elif event["status"] == "success":
+ self.log.info("Cluster is ready to provision compute.")
+ _create_compute(
+ compute=self.compute,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ wait_for_completion=self.wait_for_completion,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ nodegroup_name=self.nodegroup_name,
+ nodegroup_role_arn=self.nodegroup_role_arn,
+ create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+ fargate_profile_name=self.fargate_profile_name,
+
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+ fargate_selectors=self.fargate_selectors,
+
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+ subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
+ )
+ if self.compute == "fargate":
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ else:
+ self.defer(
+ trigger=EksNodegroupTrigger(
+ waiter_name="nodegroup_active",
+ nodegroup_name=self.nodegroup_name,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_failed(self, context, event=None):
Review Comment:
nitpick
```suggestion
def execute_failed(self, context: Context, event: dict[str, Any] |
None=None) -> None:
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
)
+ def deferrable_create_cluster_next(self, context, event=None):
+ if event["status"] == "failed":
+ self.log.error("Cluster failed to start and will be torn down.")
+ self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ delete_resources=False,
+ ),
+ method_name="execute_failed",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ elif event["status"] == "success":
+ self.log.info("Cluster is ready to provision compute.")
+ _create_compute(
+ compute=self.compute,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ wait_for_completion=self.wait_for_completion,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ nodegroup_name=self.nodegroup_name,
+ nodegroup_role_arn=self.nodegroup_role_arn,
+ create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+ fargate_profile_name=self.fargate_profile_name,
+
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+ fargate_selectors=self.fargate_selectors,
+
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+ subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
+ )
+ if self.compute == "fargate":
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ else:
+ self.defer(
+ trigger=EksNodegroupTrigger(
+ waiter_name="nodegroup_active",
+ nodegroup_name=self.nodegroup_name,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_failed(self, context, event=None):
+ if event["status"] == "delteted":
+ self.log.info("Cluster deleted")
+ raise event["exception"]
+
+ def execute_complete(self, context, event=None):
Review Comment:
nitpick
```suggestion
def execute_failed(self, context: Context, event: dict[str, Any] |
None=None) -> None:
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
)
+ def deferrable_create_cluster_next(self, context, event=None):
+ if event["status"] == "failed":
+ self.log.error("Cluster failed to start and will be torn down.")
+ self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ delete_resources=False,
+ ),
+ method_name="execute_failed",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ elif event["status"] == "success":
+ self.log.info("Cluster is ready to provision compute.")
+ _create_compute(
+ compute=self.compute,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ wait_for_completion=self.wait_for_completion,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ nodegroup_name=self.nodegroup_name,
+ nodegroup_role_arn=self.nodegroup_role_arn,
+ create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+ fargate_profile_name=self.fargate_profile_name,
+
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+ fargate_selectors=self.fargate_selectors,
+
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+ subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
+ )
+ if self.compute == "fargate":
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ else:
+ self.defer(
+ trigger=EksNodegroupTrigger(
+ waiter_name="nodegroup_active",
+ nodegroup_name=self.nodegroup_name,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_failed(self, context, event=None):
+ if event["status"] == "delteted":
+ self.log.info("Cluster deleted")
+ raise event["exception"]
+
+ def execute_complete(self, context, event=None):
+ resource = "fargate profile" if self.compute == "fargate" else
self.compute
+ if event["status"] != "success":
+ raise AirflowException(f"Error creating {resource}: {event}")
+ else:
+ self.log.info("%s created successfully", resource)
+ return
Review Comment:
```suggestion
self.log.info("%s created successfully", resource)
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -582,21 +686,39 @@ def __init__(
wait_for_completion: bool = False,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ deferrable: bool = False,
+ waiter_delay: int = 30,
+ waiter_max_attempts: int = 40,
**kwargs,
) -> None:
self.cluster_name = cluster_name
self.force_delete_compute = force_delete_compute
- self.wait_for_completion = wait_for_completion
+ self.wait_for_completion = False if deferrable else wait_for_completion
self.aws_conn_id = aws_conn_id
self.region = region
+ self.deferrable = deferrable
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
super().__init__(**kwargs)
def execute(self, context: Context):
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)
-
+ if self.deferrable:
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ force_delete_compute=self.force_delete_compute,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_delay *
self.waiter_max_attempts),
+ )
if self.force_delete_compute:
self.delete_any_nodegroups(eks_hook)
self.delete_any_fargate_profiles(eks_hook)
Review Comment:
Should we add an else here for wrapping the rest of the code?
```suggestion
)
else:
if self.force_delete_compute:
self.delete_any_nodegroups(eks_hook)
self.delete_any_fargate_profiles(eks_hook)
```
##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -336,6 +336,34 @@ def
test_fargate_compute_missing_fargate_pod_execution_role_arn(self):
):
missing_fargate_pod_execution_role_arn.execute({})
+ @mock.patch.object(EksHook, "create_cluster")
+ def test_eks_create_cluster_short_circuit_early(self, mock_create_cluster,
caplog):
+ mock_create_cluster.return_value = None
+ eks_create_cluster_operator = EksCreateClusterOperator(
+ task_id=TASK_ID,
+ **self.create_cluster_params,
+ compute=None,
+ wait_for_completion=False,
+ deferrable=False,
+ )
+ eks_create_cluster_operator.execute({})
+ assert len(caplog.records) == 0
+
+ @mock.patch.object(EksHook, "create_cluster")
+ def test_eks_create_cluster_with_deferrable(self, mock_create_cluster,
caplog):
+ mock_create_cluster.return_value = None
+
+ eks_create_cluster_operator = EksCreateClusterOperator(
+ task_id=TASK_ID,
+ **self.create_cluster_params,
+ compute=None,
+ wait_for_completion=False,
+ deferrable=True,
+ )
+ with pytest.raises(TaskDeferred):
+ eks_create_cluster_operator.execute({})
+ assert "Waiting for EKS Cluster to provision. This will take some
time." in caplog.messages
Review Comment:
```suggestion
assert "Waiting for EKS Cluster to provision. This will take some
time." in caplog.messages
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
)
+ def deferrable_create_cluster_next(self, context, event=None):
+ if event["status"] == "failed":
+ self.log.error("Cluster failed to start and will be torn down.")
+ self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ delete_resources=False,
+ ),
+ method_name="execute_failed",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ elif event["status"] == "success":
+ self.log.info("Cluster is ready to provision compute.")
+ _create_compute(
+ compute=self.compute,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ wait_for_completion=self.wait_for_completion,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ nodegroup_name=self.nodegroup_name,
+ nodegroup_role_arn=self.nodegroup_role_arn,
+ create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+ fargate_profile_name=self.fargate_profile_name,
+
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+ fargate_selectors=self.fargate_selectors,
+
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+ subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
+ )
+ if self.compute == "fargate":
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ else:
+ self.defer(
+ trigger=EksNodegroupTrigger(
+ waiter_name="nodegroup_active",
+ nodegroup_name=self.nodegroup_name,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_failed(self, context, event=None):
+ if event["status"] == "delteted":
Review Comment:
According to
https://github.com/apache/airflow/blob/c042ee68b3cd94300d4b422b81b78d0910a4c26a/airflow/providers/amazon/aws/triggers/eks.py#L163,
it seems to be the only case, do we need this check? What happens if the
status is not "deleted"?
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -582,21 +686,39 @@ def __init__(
wait_for_completion: bool = False,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ deferrable: bool = False,
Review Comment:
After https://github.com/apache/airflow/pull/31712 is merged, we might want
to use that feature and change the default value to the following.
```suggestion
deferrable: bool = conf.getboolean("operators",
"default_deferrable", fallback=False),,
```
##########
tests/providers/amazon/aws/triggers/test_eks.py:
##########
@@ -453,3 +455,403 @@ async def
test_eks_nodegroup_trigger_run_attempts_failed(self, mock_async_conn,
assert "Error checking nodegroup" in str(exc.value)
assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksCreateClusterTrigger:
+ def test_eks_create_cluster_trigger_serialize(self):
Review Comment:
As we already wrap these test cases as a class, not sure whether we still
need the `eks_create_cluster_trigger` prefix
```suggestion
def test_serialize(self):
```
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -273,12 +280,28 @@ def execute(self, context: Context):
# Short circuit early if we don't need to wait to attach compute
# and the caller hasn't requested to wait for the cluster either.
- if not self.compute and not self.wait_for_completion:
+ # if not self.compute and not self.wait_for_completion and not
self.deferrable:
+ # return None
+ if not any([self.compute, self.wait_for_completion, self.deferrable]):
return None
self.log.info("Waiting for EKS Cluster to provision. This will take
some time.")
Review Comment:
```suggestion
self.log.info("Waiting for EKS Cluster to provision. This will take
some time.")
```
##########
tests/providers/amazon/aws/triggers/test_eks.py:
##########
@@ -453,3 +455,403 @@ async def
test_eks_nodegroup_trigger_run_attempts_failed(self, mock_async_conn,
assert "Error checking nodegroup" in str(exc.value)
assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksCreateClusterTrigger:
+ def test_eks_create_cluster_trigger_serialize(self):
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ class_path, args = eks_create_cluster_trigger.serialize()
+ assert class_path ==
"airflow.providers.amazon.aws.triggers.eks.EksCreateClusterTrigger"
+ assert args["waiter_name"] == "test_waiter_name"
+ assert args["cluster_name"] == TEST_CLUSTER_IDENTIFIER
+ assert args["aws_conn_id"] == TEST_AWS_CONN_ID
+ assert args["waiter_delay"] == str(TEST_WAITER_DELAY)
+ assert args["waiter_max_attempts"] == str(TEST_WAITER_MAX_ATTEMPTS)
+ assert args["region"] == TEST_REGION
+
+ @pytest.mark.asyncio
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run(self, mock_async_conn):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ a_mock.get_waiter().wait = AsyncMock()
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent(
+ {
+ "status": "success",
+ }
+ )
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_multiple_attempts(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error,
True])
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ response = await generator.asend(None)
+ assert a_mock.get_waiter().wait.call_count == 4
+ assert response == TriggerEvent(
+ {
+ "status": "success",
+ }
+ )
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_attempts_exceeded(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error,
True])
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=2,
+ region=TEST_REGION,
+ )
+
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_create_cluster_trigger.run()
+ await generator.asend(None)
+ assert "Waiter error: max attempts reached" in str(exc)
+ assert a_mock.get_waiter().wait.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_cluster_trigger_run_attempts_failed(self,
mock_async_conn, mock_sleep):
+ mock_sleep.return_value = True
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error_creating = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"cluster": {"status": "CREATING"}},
+ )
+ error_failed = WaiterError(
+ name="test_name",
+ reason="Waiter encountered a terminal failure state:",
+ last_response={"cluster": {"status": "FAILED"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error_creating,
error_creating, error_failed])
+
+ AirflowException("Error checking Eks cluster")
+
+ eks_create_cluster_trigger = EksCreateClusterTrigger(
+ waiter_name="test_waiter_name",
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ waiter_delay=TEST_WAITER_DELAY,
+ waiter_max_attempts=TEST_WAITER_MAX_ATTEMPTS,
+ region=TEST_REGION,
+ )
+
+ generator = eks_create_cluster_trigger.run()
+ await generator.asend(None)
+
+ # assert response == TriggerEvent({"status": "failed", "exception":
exc})
+ # from pdb import set_trace; set_trace()
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]