vandonr-amz commented on code in PR #32274:
URL: https://github.com/apache/airflow/pull/32274#discussion_r1253598328
##########
airflow/providers/amazon/aws/triggers/eks.py:
##########
@@ -115,59 +90,37 @@ def __init__(
waiter_max_attempts: int,
aws_conn_id: str,
region: str | None = None,
+ region_name: str | None = None,
):
- self.cluster_name = cluster_name
- self.fargate_profile_name = fargate_profile_name
- self.waiter_delay = waiter_delay
- self.waiter_max_attempts = waiter_max_attempts
- self.aws_conn_id = aws_conn_id
- self.region = region
-
- def serialize(self) -> tuple[str, dict[str, Any]]:
- return (
- self.__class__.__module__ + "." + self.__class__.__qualname__,
- {
- "cluster_name": self.cluster_name,
- "fargate_profile_name": self.fargate_profile_name,
- "waiter_delay": str(self.waiter_delay),
- "waiter_max_attempts": str(self.waiter_max_attempts),
- "aws_conn_id": self.aws_conn_id,
- "region": self.region,
- },
+ if region is not None:
+ warnings.warn(
+ "please use region_name param instead of region",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ region_name = region
+
+ super().__init__(
+ serialized_fields={"cluster_name": cluster_name,
"fargate_profile_name": fargate_profile_name},
+ waiter_name="fargate_profile_deleted",
+ waiter_args={"clusterName": cluster_name, "fargateProfileName":
fargate_profile_name},
+ failure_message="Failure while deleting Fargate profile",
+ status_message="Fargate profile not deleted yet",
+ status_queries=["fargateProfile.status"],
+ return_value=None,
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ aws_conn_id=aws_conn_id,
+ region_name=region_name,
)
- async def run(self):
- self.hook = EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region)
- async with self.hook.async_conn as client:
- attempt = 0
- waiter = client.get_waiter("fargate_profile_deleted")
- while attempt < int(self.waiter_max_attempts):
- attempt += 1
- try:
- await waiter.wait(
- clusterName=self.cluster_name,
- fargateProfileName=self.fargate_profile_name,
- WaiterConfig={"Delay": int(self.waiter_delay),
"MaxAttempts": 1},
- )
- break
- except WaiterError as error:
- if "terminal failure" in str(error):
- raise AirflowException(f"Delete Fargate Profile
failed: {error}")
- self.log.info(
- "Status of fargate profile is %s",
error.last_response["fargateProfile"]["status"]
- )
- await asyncio.sleep(int(self.waiter_delay))
- if attempt >= int(self.waiter_max_attempts):
- raise AirflowException(
- f"Delete Fargate Profile failed - max attempts reached:
{self.waiter_max_attempts}"
- )
- else:
- yield TriggerEvent({"status": "success", "message": "Fargate
Profile Deleted"})
+ def hook(self) -> AwsGenericHook:
+ return EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
-class EksNodegroupTrigger(BaseTrigger):
+class EksCreateNodegroupTrigger(AwsBaseWaiterTrigger):
Review Comment:
I remember you said back in the day that you preferred separate triggers so
that the status/failure messages can be more descriptive ;)
Also, given the lower footprint of triggers created this way, I think it's
ok to have specific triggers for each thing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]