mik-laj commented on code in PR #51463:
URL: https://github.com/apache/airflow/pull/51463#discussion_r2152672983
##########
providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py:
##########
@@ -300,10 +307,89 @@ async def get_sql_api_query_status_async(self, query_id:
str) -> dict[str, str |
"""
self.log.info("Retrieving status for query id %s", query_id)
header, params, url = self.get_request_url_header_params(query_id)
- async with (
- aiohttp.ClientSession(headers=header) as session,
- session.get(url, params=params) as response,
+ status_code, resp = await
self._make_api_call_with_retries_async("GET", url, header, params)
+ return self._process_response(status_code, resp)
+
+ @staticmethod
+ def _should_retry_on_error(exception) -> bool:
+ """
+ Determine if the exception should trigger a retry based on error type
and status code.
+
+ Retries on HTTP errors 429 (Too Many Requests), 503 (Service
Unavailable),
+ and 504 (Gateway Timeout) as recommended by Snowflake error handling
docs.
+ Retries on connection errors and timeouts.
+
+ :param exception: The exception to check
+ :return: True if the request should be retried, False otherwise
+ """
+ if isinstance(exception, HTTPError):
+ return exception.response.status_code in [429, 503, 504]
+ if isinstance(exception, ClientResponseError):
+ return exception.status in [429, 503, 504]
+ if isinstance(
+ exception,
+ (
+ ConnectionError,
+ Timeout,
+ ClientConnectionError,
+ ),
):
- status_code = response.status
- resp = await response.json()
- return self._process_response(status_code, resp)
+ return True
+ return False
+
+ def _make_api_call_with_retries(self, method, url, headers, params=None,
data=None):
+ """
+ Make an API call to the Snowflake SQL API with retry logic for
specific HTTP errors.
+
+ Error handling implemented based on Snowflake error handling docs:
+ https://docs.snowflake.com/en/developer-guide/sql-api/handling-errors
+
+ :param method: The HTTP method to use for the API call.
+ :param url: The URL for the API endpoint.
+ :param headers: The headers to include in the API call.
+ :param params: (Optional) The query parameters to include in the API
call.
+ :param data: (Optional) The data to include in the API call.
+ :return: The response object from the API call.
+ """
+
+ @tenacity.retry(**self.retry_config) # Use the retry args defined in
constructor
+ def _make_request():
+ if method.upper() == "GET":
+ response = requests.get(url, headers=headers, params=params)
+ elif method.upper() == "POST":
+ response = requests.post(url, headers=headers, params=params,
json=data)
Review Comment:
I have the impression that the requests library correctly opens a new TCP
session if it is closed too early.
In our case, the best solution will be something similar to the following:
```
with requests.Session() as session:
for attempt in Retrying(**self.retry_config):
with attempt:
response = session.request(
method=method.lower(), url=url, headers=headers,
params=params, json=json
)
response.raise_for_status()
return response.status_code, response.json()
```
We have several advantages here:
1. We reuse requests sessions and therefore also the TCP connection when
possible. The requests session is also closed correctly because we use the
requests.Sesssion() context manager
2. We don't have inner functions, so it's easier to catch the lifespan of
objects.
3. We don't have repeated validation of the `method` parameter, which
already exists in the session.request function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]