astro-anand commented on code in PR #51463:
URL: https://github.com/apache/airflow/pull/51463#discussion_r2138277267
##########
providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py:
##########
@@ -300,10 +307,89 @@ async def get_sql_api_query_status_async(self, query_id:
str) -> dict[str, str |
"""
self.log.info("Retrieving status for query id %s", query_id)
header, params, url = self.get_request_url_header_params(query_id)
- async with (
- aiohttp.ClientSession(headers=header) as session,
- session.get(url, params=params) as response,
+ status_code, resp = await
self._make_api_call_with_retries_async("GET", url, header, params)
+ return self._process_response(status_code, resp)
+
+ @staticmethod
+ def _should_retry_on_error(exception) -> bool:
+ """
+ Determine if the exception should trigger a retry based on error type
and status code.
+
+ Retries on HTTP errors 429 (Too Many Requests), 503 (Service
Unavailable),
+ and 504 (Gateway Timeout) as recommended by Snowflake error handling
docs.
+ Retries on connection errors and timeouts.
+
+ :param exception: The exception to check
+ :return: True if the request should be retried, False otherwise
+ """
+ if isinstance(exception, HTTPError):
+ return exception.response.status_code in [429, 503, 504]
+ if isinstance(exception, ClientResponseError):
+ return exception.status in [429, 503, 504]
+ if isinstance(
+ exception,
+ (
+ ConnectionError,
+ Timeout,
+ ClientConnectionError,
+ ),
):
- status_code = response.status
- resp = await response.json()
- return self._process_response(status_code, resp)
+ return True
+ return False
+
+ def _make_api_call_with_retries(self, method, url, headers, params=None,
data=None):
+ """
+ Make an API call to the Snowflake SQL API with retry logic for
specific HTTP errors.
+
+ Error handling implemented based on Snowflake error handling docs:
+ https://docs.snowflake.com/en/developer-guide/sql-api/handling-errors
+
+ :param method: The HTTP method to use for the API call.
+ :param url: The URL for the API endpoint.
+ :param headers: The headers to include in the API call.
+ :param params: (Optional) The query parameters to include in the API
call.
+ :param data: (Optional) The data to include in the API call.
+ :return: The response object from the API call.
+ """
+
+ @tenacity.retry(**self.retry_config) # Use the retry args defined in
constructor
+ def _make_request():
+ if method.upper() == "GET":
+ response = requests.get(url, headers=headers, params=params)
+ elif method.upper() == "POST":
+ response = requests.post(url, headers=headers, params=params,
json=data)
+ else:
+ raise ValueError(f"Unsupported HTTP method: {method}")
+ response.raise_for_status()
+ return response.status_code, response.json()
+
+ return _make_request()
Review Comment:
Either way is ok with me. The current option minimizes boilerplate, but if
the inner function is not favorable, then I'll update
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]