ramitkataria commented on code in PR #68922:
URL: https://github.com/apache/airflow/pull/68922#discussion_r3471000847
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +874,52 @@ def __init__(
self._attempt_interval = 15
self.deferrable = deferrable
self.max_attempts = max_attempts
+ self.resume_if_paused = resume_if_paused
+
+ def _resume_if_paused(self) -> None:
+ """
+ Resume the cluster if it is paused.
+
+ A paused Redshift cluster cannot be deleted. If the cluster is
currently paused, resume it
+ and wait until it reaches the ``available`` state before continuing.
+ """
+ # Gated behind the opt-in ``resume_if_paused`` flag: resume and delete
are two separate,
+ # non-transactional AWS calls, so a failure between them would leave
the cluster running.
+ try:
+ cluster_state =
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ except self.hook.conn.exceptions.ClusterNotFoundFault:
+ self.log.info(
+ "Cluster %s not found while checking whether resume is
required.",
+ self.cluster_identifier,
+ )
+ return
+
+ if cluster_state != "paused":
+ self.log.info(
+ "Cluster %s is in state %s; skipping resume.",
+ self.cluster_identifier,
+ cluster_state,
+ )
+ return
+
+ self.log.info(
+ "Cluster %s is paused; resuming it before deletion (a paused
cluster cannot be deleted).",
+ self.cluster_identifier,
+ )
+
self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier)
+ self.hook.conn.get_waiter("cluster_available").wait(
+ ClusterIdentifier=self.cluster_identifier,
+ WaiterConfig={"Delay": self.poll_interval, "MaxAttempts":
self.max_attempts},
+ )
def execute(self, context: Context):
+ # A paused cluster cannot be deleted; optionally resume it first
(otherwise the retry loop
+ # below would exhaust against InvalidClusterStateFault and the cluster
would be leaked).
+ # Opt-in (resume_if_paused) because resume+delete is not transactional
-- see
+ # _resume_if_paused.
+ if self.resume_if_paused:
+ self._resume_if_paused()
Review Comment:
Wouldn't this add an upto 15 min block in the deferrable case, defeating the
purpose of deferrable mode?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]