vincbeck commented on code in PR #30032:
URL: https://github.com/apache/airflow/pull/30032#discussion_r1171912516


##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:

Review Comment:
   ```suggestion
   In this case, we are using the built-in cluster_available waiter. If we 
wanted to use a custom waiter, we would change the code slightly to use the 
`get_waiter` function from the hook, rather than the aiobotocore client:
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -140,6 +143,7 @@ def __init__(
         wait_for_completion: bool = False,
         max_attempt: int = 5,
         poll_interval: int = 60,
+        deferrable: bool = False,

Review Comment:
   docstring



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, 
client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which 
takes an optional argument of deferrable (set to True), and the aiobotocore 
client. cluster_paused is a custom boto waiter defined in redshift.json  in the 
airflow/providers/amazon/aws/waiters folder. In general, the config file for a 
custom waiter should be named as <service_name>.json. The config for 
cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the 
[README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md)
 for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the 
problem. In such cases, the asynchronous method used to poll the boto3 API 
would need to be defined in the hook of the service being used. This method is 
essentially the same as the synchronous version of the method, except that it 
will use the aiobotocore client, and will be awaited. For the Redshift example, 
the async describe_clusters method would look as follows:

Review Comment:
   ```suggestion
   In some cases, a built-in or custom waiter may not be able to solve the 
problem. In such cases, the asynchronous method used to poll the boto3 API 
would need to be defined in the hook of the service being used. This method is 
essentially the same as the synchronous version of the method, except that it 
will use the aiobotocore client, and will be awaited. For the Redshift example, 
the async `describe_clusters` method would look as follows:
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, 
client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which 
takes an optional argument of deferrable (set to True), and the aiobotocore 
client. cluster_paused is a custom boto waiter defined in redshift.json  in the 
airflow/providers/amazon/aws/waiters folder. In general, the config file for a 
custom waiter should be named as <service_name>.json. The config for 
cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the 
[README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md)
 for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the 
problem. In such cases, the asynchronous method used to poll the boto3 API 
would need to be defined in the hook of the service being used. This method is 
essentially the same as the synchronous version of the method, except that it 
will use the aiobotocore client, and will be awaited. For the Redshift example, 
the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = 
client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The 
polling logic will need to be implemented manually, taking care to use 
asyncio.sleep() rather than time.sleep().
+
+The last step in the Trigger is to yield a TriggerEvent that will be used to 
alert the Triggerer that the Trigger has finished execution. The TriggerEvent 
can pass information from the trigger to the method_name method named in the 
self.defer call in the operator. In the Redshift example, the TriggerEvent 
would look as follows:

Review Comment:
   ```suggestion
   The last step in the Trigger is to yield a `TriggerEvent` that will be used 
to alert the `Triggerer` that the Trigger has finished execution. The 
`TriggerEvent` can pass information from the trigger to the `method_name` 
method named in the `self.defer` call in the operator. In the Redshift example, 
the `TriggerEvent` would look as follows:
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, 
client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which 
takes an optional argument of deferrable (set to True), and the aiobotocore 
client. cluster_paused is a custom boto waiter defined in redshift.json  in the 
airflow/providers/amazon/aws/waiters folder. In general, the config file for a 
custom waiter should be named as <service_name>.json. The config for 
cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the 
[README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md)
 for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the 
problem. In such cases, the asynchronous method used to poll the boto3 API 
would need to be defined in the hook of the service being used. This method is 
essentially the same as the synchronous version of the method, except that it 
will use the aiobotocore client, and will be awaited. For the Redshift example, 
the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = 
client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The 
polling logic will need to be implemented manually, taking care to use 
asyncio.sleep() rather than time.sleep().
+
+The last step in the Trigger is to yield a TriggerEvent that will be used to 
alert the Triggerer that the Trigger has finished execution. The TriggerEvent 
can pass information from the trigger to the method_name method named in the 
self.defer call in the operator. In the Redshift example, the TriggerEvent 
would look as follows:
+
+```
+yield TriggerEvent({"status": "success", "message": "Cluster Created"})
+```
+
+The object passed through the TrigggerEvent can be captured in the method_name 
method through an event parameter. This can be used to determine what needs to 
be done based on the outcome of the Trigger execution. In the Redshift case, we 
can simply check the status of the event, and raise an Exception if something 
went wrong.

Review Comment:
   ```suggestion
   The object passed through the `TrigggerEvent` can be captured in the 
`method_name` method through an event parameter. This can be used to determine 
what needs to be done based on the outcome of the Trigger execution. In the 
Redshift case, we can simply check the status of the event, and raise an 
Exception if something went wrong.
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -252,6 +257,16 @@ def execute(self, context: Context):
             self.master_user_password,
             params,
         )
+        if self.deferrable:

Review Comment:
   I dont see this new branch being tested in unit test. Do you test it 
somewhere?



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, 
client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which 
takes an optional argument of deferrable (set to True), and the aiobotocore 
client. cluster_paused is a custom boto waiter defined in redshift.json  in the 
airflow/providers/amazon/aws/waiters folder. In general, the config file for a 
custom waiter should be named as <service_name>.json. The config for 
cluster_paused is shown below:
+
+```json
+{
+    "version": 2,
+    "waiters": {
+        "cluster_paused": {
+            "operation": "DescribeClusters",
+            "delay": 30,
+            "maxAttempts": 60,
+            "acceptors": [
+                {
+                    "matcher": "pathAll",
+                    "argument": "Clusters[].ClusterStatus",
+                    "expected": "paused",
+                    "state": "success"
+                },
+                {
+                    "expected": "ClusterNotFound",
+                    "matcher": "error",
+                    "state": "retry"
+                },
+                {
+                    "expected": "deleting",
+                    "matcher": "pathAny",
+                    "state": "failure",
+                    "argument": "Clusters[].ClusterStatus"
+                }
+            ]
+        },
+    }
+}
+```
+
+For more information about writing custom waiter, see the 
[README.md](https://github.com/apache/airflow/blob/main/airflow/providers/amazon/aws/waiters/README.md)
 for custom waiters.
+
+In some cases, a built-in or custom waiter may not be able to solve the 
problem. In such cases, the asynchronous method used to poll the boto3 API 
would need to be defined in the hook of the service being used. This method is 
essentially the same as the synchronous version of the method, except that it 
will use the aiobotocore client, and will be awaited. For the Redshift example, 
the async describe_clusters method would look as follows:
+
+```python
+async with self.async_conn as client:
+    response = 
client.describe_clusters(ClusterIdentifier=self.cluster_identifier)
+```
+
+This async method can be used in the Trigger to poll the boto3 API. The 
polling logic will need to be implemented manually, taking care to use 
asyncio.sleep() rather than time.sleep().

Review Comment:
   ```suggestion
   This async method can be used in the Trigger to poll the boto3 API. The 
polling logic will need to be implemented manually, taking care to use 
`asyncio.sleep()` rather than `time.sleep()`.
   ```



##########
airflow/providers/amazon/aws/triggers/README.md:
##########
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Writing Deferrable Operators for Amazon Provider Package
+
+
+Before writing deferrable operators, it is strongly recommended to read and 
familiarize yourself with the official 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html)
 of Deferrable Operators.
+The purpose of this guide is to provide a standardized way to convert existing 
Amazon Provider Package (AMPP) operators to deferrable operators. Due to the 
varied complexities of available operators, it is impossible to define one 
method that will work for every operator.
+The method described in this guide should work for many of the AMPP operators, 
but it is important to study each operator before determining whether the steps 
outlined below are applicable.
+
+Although it varies from operator to operator, a typical AMPP operator has 3 
stages:
+
+1. A pre-processing stage, where information is looked up via boto3 API calls, 
parameters are formatted etc. The complexity of this stage depends on the 
complexity of the task the operator is attempting to do. Some operators (e.g. 
Sagemaker) have a lot of pre-processing, whereas others require little to no 
pre-processing.
+2. The "main" call to the boto3 API to start an operation. This is the task 
that the operator is attempting to complete. This could be a request to 
provision a resource, request to change the state of a resource, start a job on 
a resource etc. Regardless of the operation, the boto3 API returns a response 
instantly (ignoring network delays) with a response detailing the results of 
the query. For example, in the case of a resource provisioning request, 
although the resource can take significant time to be allocated, the boto3 API 
returns a response to the caller without waiting for the operation to be 
completed.
+3. The last, often optional, stage is to wait for the operation initiated in 
stage 2 to be completed. This usually involves polling the boto3 API at set 
intervals, and waiting for a certain criteria to be met.
+
+In general, it is the last polling stage where we can defer the operator to a 
trigger which can handle the polling operation. The botocore library defines 
waiters for certain services, which are built-in functions that poll a service 
and wait for a given criteria to be met.
+As part of our work for writing deferrable operators, we have extended the 
built-in waiters to allow custom waiters, which follow the same logic, but for 
services not included in the botocore library.
+We can use these custom waiters, along with the built-in waiters to implement 
the polling logic of the deferrable operators.
+
+The first step to making an existing operator deferrable is to add 
`deferrable` as a parameter to the operator, and initialize it in the 
constructor of the operator.
+The next step is to determine where the operator should be deferred. This will 
be dependent on what the operator does, and how it is written. Although every 
operator is different, there are a few guidelines to determine the best place 
to defer an operator.
+
+1. If the operator has a `wait_for_completion` parameter, the `self.defer` 
method should be called right before the check for wait_for_completion .
+2. If there is no `wait_for_completion` , look for the "main" task that the 
operator does. Often, operators will make various describe calls to to the 
boto3 API to verify certain conditions, or look up some information before 
performing its "main" task. Often, right after the "main" call to the boto3 API 
is made is a good place to call `self.defer`.
+
+
+Once the location to defer is decided in the operator, call the `self.defer` 
method if the `deferrable` flag is `True`. The `self.defer` method takes in 
several parameters, listed below:
+
+1. `trigger`: This is the trigger which you want to pass the execution to. We 
will write this trigger in just a moment.
+2. `method_name`: This specifies the name of the method you want to execute 
once the trigger completes its execution. The trigger cannot pass the execution 
back to the execute method of the operator. By convention, the name for this 
method is `execute_complete`.
+3. `timeout`: An optional parameter that controls the length of time the 
Trigger can execute for before timing out. This defaults to `None`, meaning no 
timeout.
+4. `kwargs`: Additional keyword arguments to pass to `method_name`. Default is 
`{}`.
+
+The Trigger is the main component of deferrable operators. They must be placed 
in the `airflow/providers/amazon/aws/triggers/` folder. All Triggers must 
implement the following 3 methods:
+
+1. `__init__`: the constructor which receives parameters from the operator. 
These must be JSON serializable.
+2. `serialize`: a function that returns the classpath, as well as keyword 
arguments to the `__init__`  method as a tuple
+3. `run` : the asynchronous function that is responsible for awaiting the 
asynchronous operations.
+
+Ideally, when the operator has deferred itself, it has already initiated the 
"main" task of the operator, and is now waiting for a certain resource to reach 
a certain state.
+As mentioned earlier, the botocore library defines a `Waiter` class for many 
services, which implements a `wait` method that can be configured via a config 
file to poll the boto3 API at set intervals, and return if the success criteria 
is met.
+The aiobotocore library, which is used to make asynchronous botocore calls, 
defines an `AIOWaiter` class, which also implements a wait method that behaves 
identical to the botocore method, except that it works asynchronously.
+Therefore, any botocore waiter is available as an aiobotocore waiter, and can 
be used to asynchronously poll a service until the desired criteria is met.
+
+To call the asynchronous `wait` function, first create a hook for the 
particular service. For example, for a Redshift hook, it would look like this:
+
+```python
+self.redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+```
+
+With this hook, we can use the async_conn property to get access to the 
aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    await client.get_waiter("cluster_available").wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+In this case, we are using the built-in cluster_available waiter. If we wanted 
to use a custom waiter, we would change the code slightly to use the get_waiter 
function from the hook, rather than the aiobotocore client:
+
+```python
+async with self.redshift_hook.async_conn as client:
+    waiter = self.redshift_hook.get_waiter("cluster_paused", deferrable=True, 
client=client)
+    await waiter.wait(
+        ClusterIdentifier=self.cluster_identifier,
+        WaiterConfig={
+            "Delay": int(self.poll_interval),
+            "MaxAttempts": int(self.max_attempt),
+        },
+    )
+```
+
+Here, we are calling the get_waiter function defined in base_aws.py which 
takes an optional argument of deferrable (set to True), and the aiobotocore 
client. cluster_paused is a custom boto waiter defined in redshift.json  in the 
airflow/providers/amazon/aws/waiters folder. In general, the config file for a 
custom waiter should be named as <service_name>.json. The config for 
cluster_paused is shown below:

Review Comment:
   ```suggestion
   Here, we are calling the `get_waiter` function defined in `base_aws.py` 
which takes an optional argument of `deferrable` (set to `True`), and the 
`aiobotocore` client. `cluster_paused` is a custom boto waiter defined in 
`redshift.json`  in the `airflow/providers/amazon/aws/waiters folder`. In 
general, the config file for a custom waiter should be named as 
<service_name>.json. The config for `cluster_paused` is shown below:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to