Taragolis commented on code in PR #29761:
URL: https://github.com/apache/airflow/pull/29761#discussion_r1117917750
##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -174,30 +206,53 @@ class
EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
:param task_definition: The family and revision (family:revision) or full
Amazon Resource Name (ARN)
of the task definition to deregister. If you use a family name, you
must specify a revision.
:param wait_for_completion: If True, waits for creation of the cluster to
complete. (default: True)
+ :param waiter_delay: The amount of time in seconds to wait between
attempts,
+ if not set then default waiter value will use.
+ :param waiter_max_attempts: The maximum number of attempts to be made,
+ if not set then default waiter value will use.
"""
template_fields: Sequence[str] = ("task_definition", "wait_for_completion")
- def __init__(self, *, task_definition: str, wait_for_completion: bool =
True, **kwargs):
+ def __init__(
+ self,
+ *,
+ task_definition: str,
+ wait_for_completion: bool = True,
+ waiter_delay: int | None = None,
+ waiter_max_attempts: int | None = None,
+ **kwargs,
+ ):
super().__init__(**kwargs)
self.task_definition = task_definition
self.wait_for_completion = wait_for_completion
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
def execute(self, context: Context):
self.log.info("Deregistering task definition %s.",
self.task_definition)
result =
self.client.deregister_task_definition(taskDefinition=self.task_definition)
-
- if self.wait_for_completion:
- while not EcsTaskDefinitionStateSensor(
- task_id="await_deregister_task_definition",
- task_definition=self.task_definition,
- target_state=EcsTaskDefinitionStates.INACTIVE,
- ).poke(context):
- # The sensor has a built-in delay and will try again until the
- # task definition is deregistered or reaches a failed state.
- pass
-
- return result["taskDefinition"]["taskDefinitionArn"]
+ task_definition_details = result["taskDefinition"]
+ task_definition_arn = task_definition_details["taskDefinitionArn"]
+ task_definition_state = task_definition_details.get("status")
+
+ if task_definition_state == EcsTaskDefinitionStates.INACTIVE:
+ # In some circumstances ECS Task Definition deleted immediately,
+ # and there is no reason wait for completion.
Review Comment:
To be honest this happen almost always 🤣
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]