syedahsn commented on code in PR #32274:
URL: https://github.com/apache/airflow/pull/32274#discussion_r1253633598
##########
airflow/providers/amazon/aws/triggers/eks.py:
##########
@@ -115,59 +90,37 @@ def __init__(
waiter_max_attempts: int,
aws_conn_id: str,
region: str | None = None,
+ region_name: str | None = None,
):
- self.cluster_name = cluster_name
- self.fargate_profile_name = fargate_profile_name
- self.waiter_delay = waiter_delay
- self.waiter_max_attempts = waiter_max_attempts
- self.aws_conn_id = aws_conn_id
- self.region = region
-
- def serialize(self) -> tuple[str, dict[str, Any]]:
- return (
- self.__class__.__module__ + "." + self.__class__.__qualname__,
- {
- "cluster_name": self.cluster_name,
- "fargate_profile_name": self.fargate_profile_name,
- "waiter_delay": str(self.waiter_delay),
- "waiter_max_attempts": str(self.waiter_max_attempts),
- "aws_conn_id": self.aws_conn_id,
- "region": self.region,
- },
+ if region is not None:
+ warnings.warn(
+ "please use region_name param instead of region",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ region_name = region
+
+ super().__init__(
+ serialized_fields={"cluster_name": cluster_name,
"fargate_profile_name": fargate_profile_name},
+ waiter_name="fargate_profile_deleted",
+ waiter_args={"clusterName": cluster_name, "fargateProfileName":
fargate_profile_name},
+ failure_message="Failure while deleting Fargate profile",
+ status_message="Fargate profile not deleted yet",
+ status_queries=["fargateProfile.status"],
+ return_value=None,
+ waiter_delay=waiter_delay,
+ waiter_max_attempts=waiter_max_attempts,
+ aws_conn_id=aws_conn_id,
+ region_name=region_name,
)
- async def run(self):
- self.hook = EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region)
- async with self.hook.async_conn as client:
- attempt = 0
- waiter = client.get_waiter("fargate_profile_deleted")
- while attempt < int(self.waiter_max_attempts):
- attempt += 1
- try:
- await waiter.wait(
- clusterName=self.cluster_name,
- fargateProfileName=self.fargate_profile_name,
- WaiterConfig={"Delay": int(self.waiter_delay),
"MaxAttempts": 1},
- )
- break
- except WaiterError as error:
- if "terminal failure" in str(error):
- raise AirflowException(f"Delete Fargate Profile
failed: {error}")
- self.log.info(
- "Status of fargate profile is %s",
error.last_response["fargateProfile"]["status"]
- )
- await asyncio.sleep(int(self.waiter_delay))
- if attempt >= int(self.waiter_max_attempts):
- raise AirflowException(
- f"Delete Fargate Profile failed - max attempts reached:
{self.waiter_max_attempts}"
- )
- else:
- yield TriggerEvent({"status": "success", "message": "Fargate
Profile Deleted"})
+ def hook(self) -> AwsGenericHook:
+ return EksHook(aws_conn_id=self.aws_conn_id,
region_name=self.region_name)
-class EksNodegroupTrigger(BaseTrigger):
+class EksCreateNodegroupTrigger(AwsBaseWaiterTrigger):
Review Comment:
Ah, I ended up adopting your method haha. Alright, so just to be clear, I am
going to be creating a separate Trigger for each operator again.
>Also, given the lower footprint of triggers created this way, I think it's
ok to have specific triggers for each thing.
Definitely. Big +1 for reducing the repeated code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]