vincbeck commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1153311655


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job 
status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, 
e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is 
applied to the
+        delay .  It is generally recommended that random jitter is added to 
API requests.
+        A convenience method is provided for this, e.g. to get a random delay 
of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, 
minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, 
**kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be 
raised if execution_timeout
+        is given while creating the task. These exceptions should be handled 
in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:

Review Comment:
   I think leaving the option to the user to pass the waiters make everything 
more complicated no? Is there any reason why you want to do it that way? 
Another solution would be to implement a waiter for batch following [these 
instructions](https://github.com/apache/airflow/tree/main/airflow/providers/amazon/aws/waiters)
 and then use it in this hook. It would avoid having 2 branches (one with 
waiters passed as parameters, one with no waiter passed) and make the hook 
easier to use (no need to wonder whether we should pass some waiters). WDYT?



##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,238 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job 
status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, 
e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is 
applied to the
+        delay .  It is generally recommended that random jitter is added to 
API requests.
+        A convenience method is provided for this, e.g. to get a random delay 
of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, 
minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, 
**kwargs: Any) -> None:

Review Comment:
   Docstring does not match parameters here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to