pankajkoti commented on code in PR #36416:
URL: https://github.com/apache/airflow/pull/36416#discussion_r1436423823


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -714,26 +738,41 @@ def execute(self, context: Context):
                     time.sleep(self._attempt_interval)
                 else:
                     raise
+
         if self.deferrable:
-            self.defer(
-                timeout=timedelta(seconds=self.max_attempts * 
self.poll_interval + 60),
-                trigger=RedshiftDeleteClusterTrigger(
-                    cluster_identifier=self.cluster_identifier,
-                    waiter_delay=self.poll_interval,
-                    waiter_max_attempts=self.max_attempts,
-                    aws_conn_id=self.aws_conn_id,
-                ),
-                method_name="execute_complete",
-            )
+            cluster_state = 
self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+            if cluster_state == "cluster_not_found":
+                self.log.info("Cluster deleted successfully")
+            elif cluster_state in ("creating", "modifying"):
+                raise AirflowException(
+                    f"Unable to delete cluster since cluster is currently in 
status: {cluster_state}"
+                )
+            else:

Review Comment:
   similar comment as above. The else clause could be a broader else to raise 
an exception for all other states.



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -535,19 +535,29 @@ def execute(self, context: Context):
                     time.sleep(self._attempt_interval)
                 else:
                     raise error
+
         if self.deferrable:
-            self.defer(
-                trigger=RedshiftResumeClusterTrigger(
-                    cluster_identifier=self.cluster_identifier,
-                    waiter_delay=self.poll_interval,
-                    waiter_max_attempts=self.max_attempts,
-                    aws_conn_id=self.aws_conn_id,
-                ),
-                method_name="execute_complete",
-                # timeout is set to ensure that if a trigger dies, the timeout 
does not restart
-                # 60 seconds is added to allow the trigger to exit gracefully 
(i.e. yield TriggerEvent)
-                timeout=timedelta(seconds=self.max_attempts * 
self.poll_interval + 60),
-            )
+            cluster_state = 
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+            if cluster_state == "available":
+                self.log.info("Resumed cluster successfully")
+            elif cluster_state == "deleting":
+                raise AirflowException(
+                    "Unable to resume cluster since cluster is currently in 
status: %s", cluster_state
+                )
+            else:

Review Comment:
   Since there are so many cluster states possible as per 
https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-clusters.html#rs-mgmt-cluster-status,
 we should explicitly have an elif check here (probably on `paused`) to check 
the cluster is in resumable state and have a broader else clause to raise an 
exception instead



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -624,25 +637,36 @@ def execute(self, context: Context):
                 else:
                     raise error
         if self.deferrable:
-            self.defer(
-                trigger=RedshiftPauseClusterTrigger(
-                    cluster_identifier=self.cluster_identifier,
-                    waiter_delay=self.poll_interval,
-                    waiter_max_attempts=self.max_attempts,
-                    aws_conn_id=self.aws_conn_id,
-                ),
-                method_name="execute_complete",
-                # timeout is set to ensure that if a trigger dies, the timeout 
does not restart
-                # 60 seconds is added to allow the trigger to exit gracefully 
(i.e. yield TriggerEvent)
-                timeout=timedelta(seconds=self.max_attempts * 
self.poll_interval + 60),
-            )
+            cluster_state = 
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+            if cluster_state == "paused":
+                self.log.info("Paused cluster successfully")
+            elif cluster_state == "deleting":
+                raise AirflowException(
+                    f"Unable to pause cluster since cluster is currently in 
status: {cluster_state}"
+                )
+            else:

Review Comment:
   similar comment as above. The else clause could be a broader else to raise 
an exception for all other states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to