eladkal commented on code in PR #27276:
URL: https://github.com/apache/airflow/pull/27276#discussion_r1008350886
##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -157,16 +158,24 @@ def create_cluster_snapshot(
)
return response["Snapshot"] if response["Snapshot"] else None
- def get_cluster_snapshot_status(self, snapshot_identifier: str,
cluster_identifier: str):
+ def get_cluster_snapshot_status(self, snapshot_identifier: str,
cluster_identifier: str | None = None):
"""
Return Redshift cluster snapshot status. If cluster snapshot not found
return ``None``
:param snapshot_identifier: A unique identifier for the snapshot that
you are requesting
- :param cluster_identifier: The unique identifier of the cluster the
snapshot was created from
+ :param cluster_identifier: (deprecated) The unique identifier of the
cluster
+ the snapshot was created from
Review Comment:
Can you please share some context?
The PR description says the motivation is issues with system tests but there
should be more to it?
Why are we deprecating this parameter? If boto3 accept this as valid input
why should we prevent users from using it?
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -392,20 +393,31 @@ def __init__(
*,
cluster_identifier: str,
aws_conn_id: str = "aws_default",
+ attempts: int = 1,
+ attempt_interval: int = 30,
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
self.aws_conn_id = aws_conn_id
+ self.attempts = attempts
+ self.attempt_interval = attempt_interval
def execute(self, context: Context):
redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
- cluster_state =
redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
- if cluster_state == "paused":
- self.log.info("Starting Redshift cluster %s",
self.cluster_identifier)
-
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
- else:
- raise Exception(f"Unable to resume cluster - cluster state is
{cluster_state}")
+
+ while self.attempts >= 1:
+ try:
+
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
+ return
+ except
redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
+ self.attempts = self.attempts - 1
+
+ if self.attempts > 0:
+ self.log.error("Unable to resume cluster. %d attempts
remaining.", self.attempts)
+ time.sleep(self.attempt_interval)
+ else:
+ raise error
Review Comment:
This is essentially the behavior of `retries` and `retry_delay` of
BaseOperator
Why do we need another layer of "internal retries"?
I can argue that if we have such need then it possibly goes beyond this
single operator and may apply to others.
If we must have it lets not expose the parameters to the users. I'm not sure
if we want each indvidual operator to set it's own user facing logic for retry.
I think this is something should be discussed in more depth. I'd like to
hear others on this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]