syedahsn commented on code in PR #30853:
URL: https://github.com/apache/airflow/pull/30853#discussion_r1187018026


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -524,64 +527,52 @@ def __init__(
         aws_conn_id: str = "aws_default",
         deferrable: bool = False,
         poll_interval: int = 10,
+        max_attempts: int = 15,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
         self.aws_conn_id = aws_conn_id
         self.deferrable = deferrable
+        self.max_attempts = max_attempts
         self.poll_interval = poll_interval
-        # These parameters are added to address an issue with the boto3 API 
where the API
+        # These parameters are used to address an issue with the boto3 API 
where the API
         # prematurely reports the cluster as available to receive requests. 
This causes the cluster
         # to reject initial attempts to pause the cluster despite reporting 
the correct state.
         self._attempts = 10
         self._attempt_interval = 15
 
     def execute(self, context: Context):
         redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        while self._attempts >= 1:
+            try:
+                
redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
+                break
+            except 
redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
+                self._attempts = self._attempts - 1
 
+                if self._attempts > 0:
+                    self.log.error("Unable to pause cluster. %d attempts 
remaining.", self._attempts)
+                    time.sleep(self._attempt_interval)
+                else:
+                    raise error
         if self.deferrable:
             self.defer(
-                timeout=self.execution_timeout,
-                trigger=RedshiftClusterTrigger(
-                    task_id=self.task_id,
+                trigger=RedshiftPauseClusterTrigger(
+                    cluster_identifier=self.cluster_identifier,
                     poll_interval=self.poll_interval,
+                    max_attempts=self.max_attempts,

Review Comment:
   I ended up going with adding a timeout, as you suggested. But I also left 
the code to track the number of attempts in as well. The reason is, when a 
Trigger times out, it does not yield a `TriggerEvent`. This makes the logs 
slightly confusing, because it seems like the Trigger succeeded. 
   ```
   [2023-05-08, 02:19:30 UTC] {credentials.py:617} INFO - Found credentials in 
shared credentials file: ~/.aws/credentials
   [2023-05-08, 02:19:38 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:19:57 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:20:15 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00 
[queued]>
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00 
[queued]>
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1343} INFO - Resuming after 
deferral
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1364} INFO - Executing 
<Task(RedshiftPauseClusterOperator): pause_cluster> on 2023-05-08 
02:11:38.809301+00:00
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:57} INFO - Started 
process 328 to run task
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'example_redshift', 'pause_cluster', 
'manual__2023-05-08T02:11:38.809301+00:00', '--job-id', '17', '--raw', 
'--subdir', 'DAGS_FOLDER/example_redshift.py', '--cfg-path', '/tmp/tmp582aoom9']
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:85} INFO - Job 17: 
Subtask pause_cluster
   [2023-05-08, 02:20:28 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: example_redshift.pause_cluster 
manual__2023-05-08T02:11:38.809301+00:00 [running]> on host 9ed7c7091244
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1863} ERROR - Task failed with 
exception
   airflow.exceptions.TaskDeferralError: Trigger/execution timeout
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1382} INFO - Marking task as 
FAILED. dag_id=example_redshift, task_id=pause_cluster, 
execution_date=20230508T021138, start_date=20230508T021920, 
end_date=20230508T022028
   ```
   I felt this was not a good user experience, so I decided to add the timeout 
along with the number of attempts. This way, under normal circumstances, a 
Trigger would fail after attempting the max number of attempts, and the Trigger 
would yield a `TriggerEvent` with an error message explaining the error. In the 
case the Trigger is killed and respawned, the timeout from the operator will 
persist and fail the Trigger at the correct time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to