syedahsn commented on code in PR #32355:
URL: https://github.com/apache/airflow/pull/32355#discussion_r1260180873
##########
airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -310,6 +333,82 @@ def execute(self, context: Context):
subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
)
+ def deferrable_create_cluster_next(self, context, event=None):
+ if event["status"] == "failed":
+ self.log.error("Cluster failed to start and will be torn down.")
+ self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.defer(
+ trigger=EksDeleteClusterTrigger(
+ cluster_name=self.cluster_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ delete_resources=False,
+ ),
+ method_name="execute_failed",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ elif event["status"] == "success":
+ self.log.info("Cluster is ready to provision compute.")
+ _create_compute(
+ compute=self.compute,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ wait_for_completion=self.wait_for_completion,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ nodegroup_name=self.nodegroup_name,
+ nodegroup_role_arn=self.nodegroup_role_arn,
+ create_nodegroup_kwargs=self.create_nodegroup_kwargs,
+ fargate_profile_name=self.fargate_profile_name,
+
fargate_pod_execution_role_arn=self.fargate_pod_execution_role_arn,
+ fargate_selectors=self.fargate_selectors,
+
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
+ subnets=cast(List[str],
self.resources_vpc_config.get("subnetIds")),
+ )
+ if self.compute == "fargate":
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+ else:
+ self.defer(
+ trigger=EksNodegroupTrigger(
+ waiter_name="nodegroup_active",
+ nodegroup_name=self.nodegroup_name,
+ cluster_name=self.cluster_name,
+ aws_conn_id=self.aws_conn_id,
+ region=self.region,
+ waiter_delay=self.waiter_delay,
+ waiter_max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
+ )
+
+ def execute_failed(self, context, event=None):
Review Comment:
I saw that your PR got merged. I added the change here, but I was having
issues with mypy which didn't like the fact that event could be `None`:
```
airflow/providers/amazon/aws/operators/eks.py:766: error: Value of type
"Optional[Dict[str, Any]]" is not indexable [index]
if event["status"] == "success":
```
This can be solved by checking to make sure event is not None everytime:
```
if event and event["status"] == "success":
```
or remove `None` as an option and set the default to `{}`.
The last option is to simply ignore mypy, which is something I would rather
avoid if possible.
I'm going with removing `None` as an option, but let me know if you think
another option is more reasonable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]