This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7329e9eed9 Fix for `EksCreateClusterOperator` deferrable mode (#36079)
7329e9eed9 is described below
commit 7329e9eed91cc34a03c275dfc9a0fd4c9a0bba6f
Author: Syed Hussain <[email protected]>
AuthorDate: Thu Dec 7 12:47:12 2023 -0800
Fix for `EksCreateClusterOperator` deferrable mode (#36079)
---
airflow/providers/amazon/aws/operators/eks.py | 30 ++++++++++++++++++++-------
airflow/providers/amazon/aws/triggers/eks.py | 29 +++++++++++++++++++++++++-
2 files changed, 50 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/eks.py
b/airflow/providers/amazon/aws/operators/eks.py
index 8da9d7d8dd..e82a2a8811 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -21,6 +21,7 @@ import logging
import warnings
from ast import literal_eval
from datetime import timedelta
+from functools import cached_property
from typing import TYPE_CHECKING, Any, List, Sequence, cast
from botocore.exceptions import ClientError, WaiterError
@@ -257,6 +258,20 @@ class EksCreateClusterOperator(BaseOperator):
**kwargs,
)
+ @cached_property
+ def hook(self) -> EksHook:
+ return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region)
+
+ @property
+ def eks_hook(self):
+ warnings.warn(
+ "`eks_hook` property is deprecated and will be removed in the
future. "
+ "Please use `hook` property instead.",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ return self.hook
+
def execute(self, context: Context):
if self.compute:
if self.compute not in SUPPORTED_COMPUTE_VALUES:
@@ -271,8 +286,7 @@ class EksCreateClusterOperator(BaseOperator):
compute=FARGATE_FULL_NAME,
requirement="fargate_pod_execution_role_arn"
)
)
- self.eks_hook = EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region)
- self.eks_hook.create_cluster(
+ self.hook.create_cluster(
name=self.cluster_name,
roleArn=self.cluster_role_arn,
resourcesVpcConfig=self.resources_vpc_config,
@@ -285,7 +299,7 @@ class EksCreateClusterOperator(BaseOperator):
return None
self.log.info("Waiting for EKS Cluster to provision. This will take
some time.")
- client = self.eks_hook.conn
+ client = self.hook.conn
if self.deferrable:
self.defer(
@@ -307,7 +321,7 @@ class EksCreateClusterOperator(BaseOperator):
)
except (ClientError, WaiterError) as e:
self.log.error("Cluster failed to start and will be torn down.\n
%s", e)
- self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.hook.delete_cluster(name=self.cluster_name)
client.get_waiter("cluster_deleted").wait(
name=self.cluster_name,
WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts":
self.waiter_max_attempts},
@@ -337,7 +351,7 @@ class EksCreateClusterOperator(BaseOperator):
raise AirflowException("Trigger error: event is None")
elif event["status"] == "failed":
self.log.error("Cluster failed to start and will be torn down.")
- self.eks_hook.delete_cluster(name=self.cluster_name)
+ self.hook.delete_cluster(name=self.cluster_name)
self.defer(
trigger=EksDeleteClusterTrigger(
cluster_name=self.cluster_name,
@@ -382,7 +396,7 @@ class EksCreateClusterOperator(BaseOperator):
method_name="execute_complete",
timeout=timedelta(seconds=self.waiter_max_attempts *
self.waiter_delay),
)
- else:
+ elif self.compute == "nodegroup":
self.defer(
trigger=EksCreateNodegroupTrigger(
nodegroup_name=self.nodegroup_name,
@@ -400,9 +414,9 @@ class EksCreateClusterOperator(BaseOperator):
if event is None:
self.log.info("Trigger error: event is None")
raise AirflowException("Trigger error: event is None")
- elif event["status"] == "delteted":
+ elif event["status"] == "deleted":
self.log.info("Cluster deleted")
- raise event["exception"]
+ raise AirflowException("Error creating cluster")
def execute_complete(self, context: Context, event: dict[str, Any] | None
= None) -> None:
resource = "fargate profile" if self.compute == "fargate" else
self.compute
diff --git a/airflow/providers/amazon/aws/triggers/eks.py
b/airflow/providers/amazon/aws/triggers/eks.py
index eb496acff8..03cef61506 100644
--- a/airflow/providers/amazon/aws/triggers/eks.py
+++ b/airflow/providers/amazon/aws/triggers/eks.py
@@ -19,7 +19,9 @@ from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.eks import EksHook
from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait
@@ -68,6 +70,25 @@ class EksCreateClusterTrigger(AwsBaseWaiterTrigger):
def hook(self) -> AwsGenericHook:
return EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
+ async def run(self):
+ async with self.hook().async_conn as client:
+ waiter = client.get_waiter(self.waiter_name)
+ try:
+ await async_wait(
+ waiter,
+ self.waiter_delay,
+ self.attempts,
+ self.waiter_args,
+ self.failure_message,
+ self.status_message,
+ self.status_queries,
+ )
+ except AirflowException as exception:
+ self.log.error("Error creating cluster: %s", exception)
+ yield TriggerEvent({"status": "failed"})
+ else:
+ yield TriggerEvent({"status": "success"})
+
class EksDeleteClusterTrigger(AwsBaseWaiterTrigger):
"""
@@ -125,7 +146,13 @@ class EksDeleteClusterTrigger(AwsBaseWaiterTrigger):
if self.force_delete_compute:
await self.delete_any_nodegroups(client=client)
await self.delete_any_fargate_profiles(client=client)
+ try:
await client.delete_cluster(name=self.cluster_name)
+ except ClientError as ex:
+ if ex.response.get("Error").get("Code") ==
"ResourceNotFoundException":
+ pass
+ else:
+ raise
await async_wait(
waiter=waiter,
waiter_delay=int(self.waiter_delay),