syedahsn commented on code in PR #30853:
URL: https://github.com/apache/airflow/pull/30853#discussion_r1187018026
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -524,64 +527,52 @@ def __init__(
aws_conn_id: str = "aws_default",
deferrable: bool = False,
poll_interval: int = 10,
+ max_attempts: int = 15,
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.aws_conn_id = aws_conn_id
self.deferrable = deferrable
+ self.max_attempts = max_attempts
self.poll_interval = poll_interval
- # These parameters are added to address an issue with the boto3 API
where the API
+ # These parameters are used to address an issue with the boto3 API
where the API
# prematurely reports the cluster as available to receive requests.
This causes the cluster
# to reject initial attempts to pause the cluster despite reporting
the correct state.
self._attempts = 10
self._attempt_interval = 15
def execute(self, context: Context):
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+ while self._attempts >= 1:
+ try:
+
redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
+ break
+ except
redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
+ self._attempts = self._attempts - 1
+ if self._attempts > 0:
+ self.log.error("Unable to pause cluster. %d attempts
remaining.", self._attempts)
+ time.sleep(self._attempt_interval)
+ else:
+ raise error
if self.deferrable:
self.defer(
- timeout=self.execution_timeout,
- trigger=RedshiftClusterTrigger(
- task_id=self.task_id,
+ trigger=RedshiftPauseClusterTrigger(
+ cluster_identifier=self.cluster_identifier,
poll_interval=self.poll_interval,
+ max_attempts=self.max_attempts,
Review Comment:
I ended up going with adding a timeout, as you suggested. But I also left
the code to track the number of attempts in as well. The reason is, when a
Trigger times out, it does not yield a `TriggerEvent`. This makes the logs
slightly confusing, because it seems like the Trigger succeeded.
```
[2023-05-08, 02:19:30 UTC] {credentials.py:617} INFO - Found credentials in
shared credentials file: ~/.aws/credentials
[2023-05-08, 02:19:38 UTC] {redshift_cluster.py:203} INFO - Status of
cluster is pausing
[2023-05-08, 02:19:57 UTC] {redshift_cluster.py:203} INFO - Status of
cluster is pausing
[2023-05-08, 02:20:15 UTC] {redshift_cluster.py:203} INFO - Status of
cluster is pausing
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00
[queued]>
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance:
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00
[queued]>
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1343} INFO - Resuming after
deferral
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1364} INFO - Executing
<Task(RedshiftPauseClusterOperator): pause_cluster> on 2023-05-08
02:11:38.809301+00:00
[2023-05-08, 02:20:28 UTC] {standard_task_runner.py:57} INFO - Started
process 328 to run task
[2023-05-08, 02:20:28 UTC] {standard_task_runner.py:84} INFO - Running:
['***', 'tasks', 'run', 'example_redshift', 'pause_cluster',
'manual__2023-05-08T02:11:38.809301+00:00', '--job-id', '17', '--raw',
'--subdir', 'DAGS_FOLDER/example_redshift.py', '--cfg-path', '/tmp/tmp582aoom9']
[2023-05-08, 02:20:28 UTC] {standard_task_runner.py:85} INFO - Job 17:
Subtask pause_cluster
[2023-05-08, 02:20:28 UTC] {task_command.py:410} INFO - Running
<TaskInstance: example_redshift.pause_cluster
manual__2023-05-08T02:11:38.809301+00:00 [running]> on host 9ed7c7091244
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1863} ERROR - Task failed with
exception
airflow.exceptions.TaskDeferralError: Trigger/execution timeout
[2023-05-08, 02:20:28 UTC] {taskinstance.py:1382} INFO - Marking task as
FAILED. dag_id=example_redshift, task_id=pause_cluster,
execution_date=20230508T021138, start_date=20230508T021920,
end_date=20230508T022028
```
I felt this was not a good user experience, so I decided to add the timeout
along with the number of attempts. This way, under normal circumstances, a
Trigger would fail after attempting the max number of attempts, and the Trigger
would yield a `TriggerEvent` with an error message explaining the error. In the
case the Trigger is killed and respawned, the timeout from the operator will
persist and fail the Trigger at the correct time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]