amoghrajesh commented on code in PR #56762:
URL: https://github.com/apache/airflow/pull/56762#discussion_r2439981961


##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -812,6 +817,37 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
 API_SSL_CERT_PATH = conf.get("api", "ssl_cert")
 
 
+def _should_retry_api_request(exception: BaseException) -> bool:
+    """Determine if an API request should be retrie based on the exception 
type."""
+    if isinstance(exception, httpx.HTTPStatusError):
+        status = exception.response.status_code
+        return status >= 500 or status == 429
+
+    # for all other httpx errors (network, timeout, connect, etc.), retry
+    if isinstance(exception, httpx.RequestError):
+        return True
+
+    return False
+
+
+def _get_retry_wait_time(retry_state) -> float:
+    """
+    Calculate wait time for retry, respecting Retry-After header on 429 
responses.
+
+    For rate limit responses (429) with a Retry-After header, uses the value 
from the header.
+    Otherwise, fall bacsk to exponential backoff with jitter.
+    """
+    exception = retry_state.outcome.exception()
+
+    if isinstance(exception, httpx.HTTPStatusError):
+        if exception.response.status_code == 429:
+            retry_after = exception.response.headers.get("Retry-After")
+            if retry_after and retry_after.isdigit():
+                return float(retry_after)
+
+    return wait_random_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX)(retry_state)

Review Comment:
   From what I can understand about stamina, it does not allow using custom 
wait strategies (what we do for 429 status code above), it can be worked around 
but IDK how important it is (can we accept losing that?)
   
   Anyways, the diff looks like such: 
   ```diff
   diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
   --- a/task-sdk/src/airflow/sdk/api/client.py (revision 
9b7d5100c658492e2f91711e8ffe9d1e45e43ca9)
   +++ b/task-sdk/src/airflow/sdk/api/client.py (date 1760706304732)
   @@ -30,13 +30,7 @@
    import msgspec
    import structlog
    from pydantic import BaseModel
   -from tenacity import (
   -    before_log,
   -    retry,
   -    retry_if_exception,
   -    stop_after_attempt,
   -    wait_random_exponential,
   -)
   +import stamina
    from uuid6 import uuid7
    
    from airflow.configuration import conf
   @@ -830,24 +824,6 @@
        return False
    
    
   -def _get_retry_wait_time(retry_state) -> float:
   -    """
   -    Calculate wait time for retry, respecting Retry-After header on 429 
responses.
   -
   -    For rate limit responses (429) with a Retry-After header, uses the 
value from the header.
   -    Otherwise, fall bacsk to exponential backoff with jitter.
   -    """
   -    exception = retry_state.outcome.exception()
   -
   -    if isinstance(exception, httpx.HTTPStatusError):
   -        if exception.response.status_code == 429:
   -            retry_after = exception.response.headers.get("Retry-After")
   -            if retry_after and retry_after.isdigit():
   -                return float(retry_after)
   -
   -    return wait_random_exponential(min=API_RETRY_WAIT_MIN, 
max=API_RETRY_WAIT_MAX)(retry_state)
   -
   -
    class Client(httpx.Client):
        def __init__(self, *, base_url: str | None, dry_run: bool = False, 
token: str, **kwargs: Any):
            if (not base_url) ^ dry_run:
   @@ -881,12 +857,12 @@
                log.debug("Execution API issued us a refreshed Task token")
                self.auth = BearerAuth(new_token)
    
   -    @retry(
   -        retry=retry_if_exception(_should_retry_api_request),
   -        stop=stop_after_attempt(7),
   -        wait=_get_retry_wait_time,
   -        before_sleep=before_log(log, logging.WARNING),
   -        reraise=True,
   +    @stamina.retry(
   +        on=_should_retry_api_request,
   +        attempts=API_RETRIES,
   +        wait_initial=API_RETRY_WAIT_MIN,
   +        wait_max=API_RETRY_WAIT_MAX,
   +        wait_jitter=1.0,
        )
        def request(self, *args, **kwargs):
            """Implement a convenience for httpx.Client.request with a retry 
layer."""
   
   ```
   
   So basically very similar diff, needs `_should_retry_api_request`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to