pankajkoti commented on code in PR #36416:
URL: https://github.com/apache/airflow/pull/36416#discussion_r1436303356
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -535,19 +535,27 @@ def execute(self, context: Context):
time.sleep(self._attempt_interval)
else:
raise error
+
if self.deferrable:
- self.defer(
- trigger=RedshiftResumeClusterTrigger(
- cluster_identifier=self.cluster_identifier,
- waiter_delay=self.poll_interval,
- waiter_max_attempts=self.max_attempts,
- aws_conn_id=self.aws_conn_id,
- ),
- method_name="execute_complete",
- # timeout is set to ensure that if a trigger dies, the timeout
does not restart
- # 60 seconds is added to allow the trigger to exit gracefully
(i.e. yield TriggerEvent)
- timeout=timedelta(seconds=self.max_attempts *
self.poll_interval + 60),
+ cluster_state =
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ if cluster_state == "paused":
+ self.defer(
+ trigger=RedshiftResumeClusterTrigger(
+ cluster_identifier=self.cluster_identifier,
+ waiter_delay=self.poll_interval,
+ waiter_max_attempts=self.max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ # timeout is set to ensure that if a trigger dies, the
timeout does not restart
+ # 60 seconds is added to allow the trigger to exit
gracefully (i.e. yield TriggerEvent)
+ timeout=timedelta(seconds=self.max_attempts *
self.poll_interval + 60),
+ )
+
+ self.log.warning(
Review Comment:
Should we raise an exception here instead? The user is intending to resume
cluster but if the state is not what the user is expecting to be shouldn't we
alert the user? Otherwise, this would silently succeed.
I see we're raising an AirflowException in the RedshiftDeleteClusterOperator
when the state is not as expected.
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -624,25 +635,34 @@ def execute(self, context: Context):
else:
raise error
if self.deferrable:
- self.defer(
- trigger=RedshiftPauseClusterTrigger(
- cluster_identifier=self.cluster_identifier,
- waiter_delay=self.poll_interval,
- waiter_max_attempts=self.max_attempts,
- aws_conn_id=self.aws_conn_id,
- ),
- method_name="execute_complete",
- # timeout is set to ensure that if a trigger dies, the timeout
does not restart
- # 60 seconds is added to allow the trigger to exit gracefully
(i.e. yield TriggerEvent)
- timeout=timedelta(seconds=self.max_attempts *
self.poll_interval + 60),
+ cluster_state =
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ if cluster_state == "available":
+ self.defer(
+ trigger=RedshiftPauseClusterTrigger(
+ cluster_identifier=self.cluster_identifier,
+ waiter_delay=self.poll_interval,
+ waiter_max_attempts=self.max_attempts,
+ aws_conn_id=self.aws_conn_id,
+ ),
+ method_name="execute_complete",
+ # timeout is set to ensure that if a trigger dies, the
timeout does not restart
+ # 60 seconds is added to allow the trigger to exit
gracefully (i.e. yield TriggerEvent)
+ timeout=timedelta(seconds=self.max_attempts *
self.poll_interval + 60),
+ )
+
+ self.log.warning(
Review Comment:
Same question here about raising an exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]