vincbeck commented on code in PR #23563:
URL: https://github.com/apache/airflow/pull/23563#discussion_r868217405
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -317,3 +318,55 @@ def execute(self, context: 'Context'):
self.log.warning(
"Unable to pause cluster since cluster is currently in status:
%s", cluster_state
)
+
+
+class RedshiftDeleteClusterOperator(BaseOperator):
+ """
+ Delete an AWS Redshift cluster.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RedshiftDeleteClusterOperator`
+
+ :param cluster_identifier: unique identifier of a cluster
+ :param skip_final_cluster_snapshot: determines cluster snapshot creation
+ :param final_cluster_snapshot_identifier: name of final cluster snapshot
+ :param aws_conn_id: aws connection to use
+ :param poll_interval: Time (in seconds) to wait between two consecutive
calls to check cluster state
+ """
+
+ template_fields: Sequence[str] = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ skip_final_cluster_snapshot: bool = True,
+ final_cluster_snapshot_identifier: Optional[str] = None,
+ aws_conn_id: str = "aws_default",
+ poll_interval: float = 30.0,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.cluster_identifier = cluster_identifier
+ self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
+ self.final_cluster_snapshot_identifier =
final_cluster_snapshot_identifier
+ self.aws_conn_id = aws_conn_id
+ self.poll_interval = poll_interval
+
+ def execute(self, context: 'Context'):
+ redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+ redshift_hook.delete_cluster(
+ cluster_identifier=self.cluster_identifier,
+ skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
+
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
+ )
+ cluster_status: str =
redshift_hook.cluster_status(self.cluster_identifier)
+ while cluster_status != "cluster_not_found":
Review Comment:
I would add a parameter to the operator named `wait_for_completion` and only
if this parameter is true we would check the status
##########
docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst:
##########
@@ -89,6 +89,20 @@ To pause an 'available' Amazon Redshift Cluster you can use
:start-after: [START howto_operator_redshift_pause_cluster]
:end-before: [END howto_operator_redshift_pause_cluster]
+.. _howto/operator:RedshiftDeleteClusterOperator:
+
+Delete an Amazon Redshift Cluster
+"""""""""""""""""""""""""""""""""
+
+To delete an Amazon Redshift Cluster
Review Comment:
```suggestion
To delete an Amazon Redshift Cluster you can use
```
##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -317,3 +318,55 @@ def execute(self, context: 'Context'):
self.log.warning(
"Unable to pause cluster since cluster is currently in status:
%s", cluster_state
)
+
+
+class RedshiftDeleteClusterOperator(BaseOperator):
+ """
+ Delete an AWS Redshift cluster.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:RedshiftDeleteClusterOperator`
+
+ :param cluster_identifier: unique identifier of a cluster
+ :param skip_final_cluster_snapshot: determines cluster snapshot creation
+ :param final_cluster_snapshot_identifier: name of final cluster snapshot
+ :param aws_conn_id: aws connection to use
+ :param poll_interval: Time (in seconds) to wait between two consecutive
calls to check cluster state
+ """
+
+ template_fields: Sequence[str] = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ skip_final_cluster_snapshot: bool = True,
+ final_cluster_snapshot_identifier: Optional[str] = None,
+ aws_conn_id: str = "aws_default",
+ poll_interval: float = 30.0,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.cluster_identifier = cluster_identifier
+ self.skip_final_cluster_snapshot = skip_final_cluster_snapshot
+ self.final_cluster_snapshot_identifier =
final_cluster_snapshot_identifier
+ self.aws_conn_id = aws_conn_id
+ self.poll_interval = poll_interval
+
+ def execute(self, context: 'Context'):
+ redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+ redshift_hook.delete_cluster(
+ cluster_identifier=self.cluster_identifier,
+ skip_final_cluster_snapshot=self.skip_final_cluster_snapshot,
+
final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier,
+ )
+ cluster_status: str =
redshift_hook.cluster_status(self.cluster_identifier)
Review Comment:
nit. Could you please separate the 2 functions `delete_cluster` and
`check_status` in 2 different methods. That would make the code cleaner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]