ramitkataria commented on code in PR #68922:
URL: https://github.com/apache/airflow/pull/68922#discussion_r3471000847


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +874,52 @@ def __init__(
         self._attempt_interval = 15
         self.deferrable = deferrable
         self.max_attempts = max_attempts
+        self.resume_if_paused = resume_if_paused
+
+    def _resume_if_paused(self) -> None:
+        """
+        Resume the cluster if it is paused.
+
+        A paused Redshift cluster cannot be deleted. If the cluster is 
currently paused, resume it
+        and wait until it reaches the ``available`` state before continuing.
+        """
+        # Gated behind the opt-in ``resume_if_paused`` flag: resume and delete 
are two separate,
+        # non-transactional AWS calls, so a failure between them would leave 
the cluster running.
+        try:
+            cluster_state = 
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        except self.hook.conn.exceptions.ClusterNotFoundFault:
+            self.log.info(
+                "Cluster %s not found while checking whether resume is 
required.",
+                self.cluster_identifier,
+            )
+            return
+
+        if cluster_state != "paused":
+            self.log.info(
+                "Cluster %s is in state %s; skipping resume.",
+                self.cluster_identifier,
+                cluster_state,
+            )
+            return
+
+        self.log.info(
+            "Cluster %s is paused; resuming it before deletion (a paused 
cluster cannot be deleted).",
+            self.cluster_identifier,
+        )
+        
self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier)
+        self.hook.conn.get_waiter("cluster_available").wait(
+            ClusterIdentifier=self.cluster_identifier,
+            WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": 
self.max_attempts},
+        )
 
     def execute(self, context: Context):
+        # A paused cluster cannot be deleted; optionally resume it first 
(otherwise the retry loop
+        # below would exhaust against InvalidClusterStateFault and the cluster 
would be leaked).
+        # Opt-in (resume_if_paused) because resume+delete is not transactional 
-- see
+        # _resume_if_paused.
+        if self.resume_if_paused:
+            self._resume_if_paused()

Review Comment:
   Wouldn't this add an upto 15 min block in the deferrable case, blocking the 
event loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to