phanikumv commented on code in PR #29300:
URL: https://github.com/apache/airflow/pull/29300#discussion_r1152789008


##########
airflow/providers/amazon/aws/hooks/batch_client.py:
##########
@@ -544,3 +546,240 @@ def exp(tries):
         delay = 1 + pow(tries * 0.6, 2)
         delay = min(max_interval, delay)
         return uniform(delay / 3, delay)
+
+
+class BatchClientAsyncHook(BatchClientHook, AwsBaseAsyncHook):
+    """
+    Async client for AWS Batch services.
+
+    :param max_retries: exponential back-off retries, 4200 = 48 hours;
+        polling is only used when waiters is None
+
+    :param status_retries: number of HTTP retries to get job status, 10;
+        polling is only used when waiters is None
+
+    .. note::
+        Several methods use a default random delay to check or poll for job 
status, i.e.
+        ``random.sample()``
+        Using a random interval helps to avoid AWS API throttle limits
+        when many concurrent tasks request job-descriptions.
+
+        To modify the global defaults for the range of jitter allowed when a
+        random delay is used to check Batch job status, modify these defaults, 
e.g.:
+
+            BatchClient.DEFAULT_DELAY_MIN = 0
+            BatchClient.DEFAULT_DELAY_MAX = 5
+
+        When explicit delay values are used, a 1 second random jitter is 
applied to the
+        delay .  It is generally recommended that random jitter is added to 
API requests.
+        A convenience method is provided for this, e.g. to get a random delay 
of
+        10 sec +/- 5 sec: ``delay = BatchClient.add_jitter(10, width=5, 
minima=0)``
+    """
+
+    def __init__(self, job_id: str | None, waiters: Any = None, *args: Any, 
**kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+        self.job_id = job_id
+        self.waiters = waiters
+
+    async def monitor_job(self) -> dict[str, str] | None:
+        """
+        Monitor an AWS Batch job
+        monitor_job can raise an exception or an AirflowTaskTimeout can be 
raised if execution_timeout
+        is given while creating the task. These exceptions should be handled 
in taskinstance.py
+        instead of here like it was previously done
+
+        :raises: AirflowException
+        """
+        if not self.job_id:
+            raise AirflowException("AWS Batch job - job_id was not found")
+
+        if self.waiters:
+            self.waiters.wait_for_job(self.job_id)
+            return None
+        else:
+            await self.wait_for_job(self.job_id)
+            await self.check_job_success(self.job_id)
+            success_msg = f"AWS Batch job ({self.job_id}) succeeded"
+            self.log.info(success_msg)
+            return {"status": "success", "message": success_msg}
+
+    async def check_job_success(self, job_id: str) -> bool:  # type: 
ignore[override]
+        """
+        Check the final status of the Batch job; return True if the job
+        'SUCCEEDED', else raise an AirflowException
+
+        :param job_id: a Batch job ID
+
+        :raises: AirflowException
+        """
+        job = await self.get_job_description(job_id)
+        job_status = job.get("status")
+        if job_status == self.SUCCESS_STATE:
+            self.log.info("AWS Batch job (%s) succeeded: %s", job_id, job)
+            return True
+
+        if job_status == self.FAILURE_STATE:
+            raise AirflowException(f"AWS Batch job ({job_id}) failed: {job}")
+
+        if job_status in self.INTERMEDIATE_STATES:
+            raise AirflowException(f"AWS Batch job ({job_id}) is not complete: 
{job}")
+
+        raise AirflowException(f"AWS Batch job ({job_id}) has unknown status: 
{job}")
+
+    @staticmethod
+    async def delay(delay: int | float | None = None) -> None:  # type: 
ignore[override]
+        """
+        Pause execution for ``delay`` seconds.
+
+        :param delay: a delay to pause execution using ``time.sleep(delay)``;
+            a small 1 second jitter is applied to the delay.
+
+        .. note::
+            This method uses a default random delay, i.e.
+            ``random.sample()``;
+            using a random interval helps to avoid AWS API throttle limits
+            when many concurrent tasks request job-descriptions.
+        """
+        if delay is None:
+            delay = sample(
+                list(range(BatchClientAsyncHook.DEFAULT_DELAY_MIN, 
BatchClientAsyncHook.DEFAULT_DELAY_MAX)), 1
+            )[0]

Review Comment:
   ```suggestion
               delay = uniform(BatchClientAsyncHook.DEFAULT_DELAY_MIN, 
BatchClientAsyncHook.DEFAULT_DELAY_MAX)
   ```
   
   The code in `astronomer-providers` was introduced to pass the `BanditCheck`, 
here it is not needed and needs to be using the uniform method



##########
airflow/providers/amazon/aws/operators/batch.py:
##########
@@ -71,6 +72,7 @@ class BatchOperator(BaseOperator):
         Override the region_name in connection (if provided)
     :param tags: collection of tags to apply to the AWS Batch job submission
         if None, no tags are submitted
+    :param deferrable: Run operator in the deferrable mode.

Review Comment:
   can you please add docs indicating the new param `deferrable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to