This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0036ef7d35 Add deferrable mode to `RedshiftResumeClusterOperator` 
(#30090)
0036ef7d35 is described below

commit 0036ef7d35b1a5f654affa10528c348e6097644f
Author: Phani Kumar <[email protected]>
AuthorDate: Tue Mar 21 18:30:16 2023 +0530

    Add deferrable mode to `RedshiftResumeClusterOperator` (#30090)
    
    This PR donates the RedshiftResumeClusterOperatorAsync from 
astronomer-providers to Airflow.
---
 .../providers/amazon/aws/hooks/redshift_cluster.py | 29 ++++++++++
 .../amazon/aws/operators/redshift_cluster.py       | 63 +++++++++++++++++-----
 .../operators/redshift_cluster.rst                 |  1 +
 .../amazon/aws/operators/test_redshift_cluster.py  | 50 +++++++++++++++++
 .../providers/amazon/aws/example_redshift.py       |  7 +++
 5 files changed, 137 insertions(+), 13 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py 
b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
index 84e97be380..962b3d4b21 100644
--- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py
@@ -256,6 +256,35 @@ class RedshiftAsyncHook(AwsBaseAsyncHook):
         except botocore.exceptions.ClientError as error:
             return {"status": "error", "message": str(error)}
 
+    async def resume_cluster(
+        self,
+        cluster_identifier: str,
+        polling_period_seconds: float = 5.0,
+    ) -> dict[str, Any]:
+        """
+        Connects to the AWS redshift cluster via aiobotocore and
+        resume the cluster for the cluster_identifier passed
+
+        :param cluster_identifier: unique identifier of a cluster
+        :param polling_period_seconds: polling period in seconds to check for 
the status
+        """
+        async with await self.get_client_async() as client:
+            try:
+                response = await 
client.resume_cluster(ClusterIdentifier=cluster_identifier)
+                status = response["Cluster"]["ClusterStatus"] if response and 
response["Cluster"] else None
+                if status == "resuming":
+                    flag = asyncio.Event()
+                    while True:
+                        expected_response = await asyncio.create_task(
+                            self.get_cluster_status(cluster_identifier, 
"available", flag)
+                        )
+                        await asyncio.sleep(polling_period_seconds)
+                        if flag.is_set():
+                            return expected_response
+                return {"status": "error", "cluster_state": status}
+            except botocore.exceptions.ClientError as error:
+                return {"status": "error", "message": str(error)}
+
     async def get_cluster_status(
         self,
         cluster_identifier: str,
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py 
b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 01fdea153a..9f553fa129 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -397,8 +397,11 @@ class RedshiftResumeClusterOperator(BaseOperator):
         For more information on how to use this operator, take a look at the 
guide:
         :ref:`howto/operator:RedshiftResumeClusterOperator`
 
-    :param cluster_identifier: id of the AWS Redshift Cluster
-    :param aws_conn_id: aws connection to use
+    :param cluster_identifier:  Unique identifier of the AWS Redshift cluster
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        The default connection id is ``aws_default``
+    :param deferrable: Run operator in deferrable mode
+    :param poll_interval: Time (in seconds) to wait between two consecutive 
calls to check cluster state
     """
 
     template_fields: Sequence[str] = ("cluster_identifier",)
@@ -410,11 +413,15 @@ class RedshiftResumeClusterOperator(BaseOperator):
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        deferrable: bool = False,
+        poll_interval: int = 10,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
         self.aws_conn_id = aws_conn_id
+        self.deferrable = deferrable
+        self.poll_interval = poll_interval
         # These parameters are added to address an issue with the boto3 API 
where the API
         # prematurely reports the cluster as available to receive requests. 
This causes the cluster
         # to reject initial attempts to resume the cluster despite reporting 
the correct state.
@@ -424,18 +431,48 @@ class RedshiftResumeClusterOperator(BaseOperator):
     def execute(self, context: Context):
         redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
 
-        while self._attempts >= 1:
-            try:
-                
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
-                return
-            except 
redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
-                self._attempts = self._attempts - 1
+        if self.deferrable:
+            self.defer(
+                timeout=self.execution_timeout,
+                trigger=RedshiftClusterTrigger(
+                    task_id=self.task_id,
+                    poll_interval=self.poll_interval,
+                    aws_conn_id=self.aws_conn_id,
+                    cluster_identifier=self.cluster_identifier,
+                    attempts=self._attempts,
+                    operation_type="pause_cluster",
+                ),
+                method_name="execute_complete",
+            )
+        else:
+            while self._attempts >= 1:
+                try:
+                    
redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
+                    return
+                except 
redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
+                    self._attempts = self._attempts - 1
 
-                if self._attempts > 0:
-                    self.log.error("Unable to resume cluster. %d attempts 
remaining.", self._attempts)
-                    time.sleep(self._attempt_interval)
-                else:
-                    raise error
+                    if self._attempts > 0:
+                        self.log.error("Unable to resume cluster. %d attempts 
remaining.", self._attempts)
+                        time.sleep(self._attempt_interval)
+                    else:
+                        raise error
+
+    def execute_complete(self, context: Context, event: Any = None) -> None:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event:
+            if "status" in event and event["status"] == "error":
+                msg = f"{event['status']}: {event['message']}"
+                raise AirflowException(msg)
+            elif "status" in event and event["status"] == "success":
+                self.log.info("%s completed successfully.", self.task_id)
+                self.log.info("Paused cluster successfully")
+        else:
+            raise AirflowException("No event received from trigger")
 
 
 class RedshiftPauseClusterOperator(BaseOperator):
diff --git 
a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst 
b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
index b85eddd2c3..d61aa4acfc 100644
--- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
+++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst
@@ -53,6 +53,7 @@ Resume an Amazon Redshift cluster
 
 To resume a 'paused' Amazon Redshift cluster you can use
 :class:`RedshiftResumeClusterOperator 
<airflow.providers.amazon.aws.operators.redshift_cluster>`
+You can also run this operator in deferrable mode by setting ``deferrable`` 
param to ``True``
 
 .. exampleinclude:: 
/../../tests/system/providers/amazon/aws/example_redshift.py
     :language: python
diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py 
b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
index e2d6a16888..8ec6f67eef 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -258,6 +258,56 @@ class TestResumeClusterOperator:
             redshift_operator.execute(None)
         assert mock_conn.resume_cluster.call_count == 10
 
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftAsyncHook.resume_cluster")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftAsyncHook.get_client_async")
+    def test_resume_cluster(self, mock_async_client, 
mock_async_resume_cluster, mock_sync_cluster_status):
+        """Test Resume cluster operator run"""
+        mock_sync_cluster_status.return_value = "paused"
+        mock_async_client.return_value.resume_cluster.return_value = {
+            "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": 
"resuming"}
+        }
+        mock_async_resume_cluster.return_value = {"status": "success", 
"cluster_state": "available"}
+
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test",
+            cluster_identifier="test_cluster",
+            aws_conn_id="aws_conn_test",
+            deferrable=True,
+        )
+
+        with pytest.raises(TaskDeferred) as exc:
+            redshift_operator.execute({})
+
+        assert isinstance(
+            exc.value.trigger, RedshiftClusterTrigger
+        ), "Trigger is not a RedshiftClusterTrigger"
+
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftAsyncHook.resume_cluster")
+    
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftAsyncHook.get_client_async")
+    def test_resume_cluster_failure(
+        self, mock_async_client, mock_async_resume_cluster, 
mock_sync_cluster_statue
+    ):
+        """Test Resume cluster operator Failure"""
+        mock_sync_cluster_statue.return_value = "paused"
+        mock_async_client.return_value.resume_cluster.return_value = {
+            "Cluster": {"ClusterIdentifier": "test_cluster", "ClusterStatus": 
"resuming"}
+        }
+        mock_async_resume_cluster.return_value = {"status": "success", 
"cluster_state": "available"}
+
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test",
+            cluster_identifier="test_cluster",
+            aws_conn_id="aws_conn_test",
+            deferrable=True,
+        )
+
+        with pytest.raises(AirflowException):
+            redshift_operator.execute_complete(
+                context=None, event={"status": "error", "message": "test 
failure message"}
+            )
+
 
 class TestPauseClusterOperator:
     def test_init(self):
diff --git a/tests/system/providers/amazon/aws/example_redshift.py 
b/tests/system/providers/amazon/aws/example_redshift.py
index 7e3a5a9280..8591e2fcdf 100644
--- a/tests/system/providers/amazon/aws/example_redshift.py
+++ b/tests/system/providers/amazon/aws/example_redshift.py
@@ -176,6 +176,12 @@ with DAG(
         task_id="resume_cluster",
         cluster_identifier=redshift_cluster_identifier,
     )
+
+    resume_cluster_in_deferred_mode = RedshiftResumeClusterOperator(
+        task_id="resume_cluster_in_deferred_mode",
+        cluster_identifier=redshift_cluster_identifier,
+        deferrable=True,
+    )
     # [END howto_operator_redshift_resume_cluster]
 
     wait_cluster_available_after_resume = RedshiftClusterSensor(
@@ -279,6 +285,7 @@ with DAG(
         pause_cluster,
         wait_cluster_paused,
         resume_cluster,
+        resume_cluster_in_deferred_mode,
         wait_cluster_available_after_resume,
         set_up_connection,
         create_table_redshift_data,

Reply via email to