vandonr-amz commented on code in PR #32055:
URL: https://github.com/apache/airflow/pull/32055#discussion_r1238780127


##########
airflow/providers/amazon/aws/utils/waiter_with_logging.py:
##########
@@ -68,23 +70,81 @@ def wait(
         except WaiterError as error:
             if "terminal failure" in str(error):
                 raise AirflowException(f"{failure_message}: {error}")
-            status_string = _format_status_string(status_args, 
error.last_response)
-            log.info("%s: %s", status_message, status_string)
+            log.info("%s: %s", status_message, 
_LazyStatusFormatter(status_args, error.last_response))
             time.sleep(waiter_delay)
 
             if attempt >= max_attempts:
                 raise AirflowException("Waiter error: max attempts reached")
 
 
-def _format_status_string(args, response):
+async def async_wait(
+    waiter: Waiter,
+    waiter_delay: int,
+    max_attempts: int,
+    args: dict[str, Any],
+    failure_message: str,
+    status_message: str,
+    status_args: list[str],
+):
     """
-    Loops through the supplied args list and generates a string
-    which contains values from the waiter response.
+    Use an async boto waiter to poll an AWS service for the specified state. 
Although this function
+    uses boto waiters to poll the state of the service, it logs the response 
of the service
+    after every attempt, which is not currently supported by boto waiters.
+
+    :param waiter: The boto waiter to use.
+    :param waiter_delay: The amount of time in seconds to wait between 
attempts.
+    :param max_attempts: The maximum number of attempts to be made.
+    :param args: The arguments to pass to the waiter.
+    :param failure_message: The message to log if a failure state is reached.
+    :param status_message: The message logged when printing the status of the 
service.
+    :param status_args: A list containing the JMESPath queries to retrieve 
status information from
+        the waiter response.
+        e.g.
+        response = {"Cluster": {"state": "CREATING"}}
+        status_args = ["Cluster.state"]
+
+        response = {
+        "Clusters": [{"state": "CREATING", "details": "User initiated."},]
+        }
+        status_args = ["Clusters[0].state", "Clusters[0].details"]
+    """
+    log = logging.getLogger(__name__)
+    attempt = 0
+    while True:
+        attempt += 1
+        try:
+            await waiter.wait(**args, WaiterConfig={"MaxAttempts": 1})
+            break
+        except WaiterError as error:
+            if "terminal failure" in str(error):
+                raise AirflowException(f"{failure_message}: {error}")
+            log.info("%s: %s", status_message, 
_LazyStatusFormatter(status_args, error.last_response))
+            await asyncio.sleep(waiter_delay)
+
+            if attempt >= max_attempts:
+                raise AirflowException("Waiter error: max attempts reached")

Review Comment:
   indeed, I'll change the synchronous version as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to