This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 615a488163b Handle HTTP 422 responses bypass to _process_response
instead of raising immediately in Snowflake SQL API hook (#60891)
615a488163b is described below
commit 615a488163b610fa602fd339cc6aae6523fb3594
Author: HyunWoo Oh <[email protected]>
AuthorDate: Tue Feb 10 22:31:23 2026 +0900
Handle HTTP 422 responses bypass to _process_response instead of raising
immediately in Snowflake SQL API hook (#60891)
---
.../providers/snowflake/hooks/snowflake_sql_api.py | 13 +-
.../unit/snowflake/hooks/test_snowflake_sql_api.py | 142 ++++++++++++++++++---
2 files changed, 138 insertions(+), 17 deletions(-)
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
index c3521e03641..f5f34b34153 100644
---
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
+++
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
@@ -476,6 +476,13 @@ class SnowflakeSqlApiHook(SnowflakeHook):
return True
return False
+ @staticmethod
+ def _should_raise_for_status(status: int) -> bool:
+ # _process_response handles HTTP 422 to provide richer error context.
+ # The response payload must be passed through even when the status is
422.
+ # See https://docs.snowflake.com/en/developer-guide/sql-api/reference
+ return status >= 400 and status != 422
+
def _make_api_call_with_retries(
self, method: str, url: str, headers: dict, params: dict | None =
None, json: dict | None = None
):
@@ -516,7 +523,8 @@ class SnowflakeSqlApiHook(SnowflakeHook):
# user first, base second => base wins even if guard
misses something
request_kwargs: dict[str, Any] = {**user_kwargs,
**base_request_kwargs}
response = session.request(**request_kwargs)
- response.raise_for_status()
+ if self._should_raise_for_status(response.status_code):
+ response.raise_for_status()
return response.status_code, response.json()
async def _make_api_call_with_retries_async(self, method, url, headers,
params=None):
@@ -560,7 +568,8 @@ class SnowflakeSqlApiHook(SnowflakeHook):
}
request_kwargs: dict[str, Any] = {**user_request_kwargs,
**base_request_kwargs}
async with session.request(**request_kwargs) as response:
- response.raise_for_status()
+ if self._should_raise_for_status(response.status):
+ response.raise_for_status()
# Return status and json content for async processing
content = await response.json()
return response.status, content
diff --git
a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
index f553be54cf9..4e89a8900a4 100644
--- a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
+++ b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
@@ -20,6 +20,7 @@ import asyncio
import base64
import unittest
import uuid
+from collections.abc import Mapping
from typing import TYPE_CHECKING, Any
from unittest import mock
from unittest.mock import AsyncMock, call
@@ -180,13 +181,44 @@ def create_successful_response_mock(content):
return response
-def create_post_side_effect(status_code=429):
- """create mock response for post side effect"""
- response = mock.MagicMock()
- response.status_code = status_code
- response.reason = "test"
- response.raise_for_status.side_effect =
requests.exceptions.HTTPError(response=response)
- return response
+def create_post_response(
+ status_code: int = 429,
+ *,
+ json_body: Mapping[str, Any] | None = None,
+ reason: str = "test",
+ http_error: BaseException | None = None,
+ raise_for_status: bool | None = None,
+):
+ """
+ Build a mock response object for requests.request/post.
+ Defaults:
+ - 2xx/3xx: raise_for_status() does nothing.
+ - 4xx/5xx: raise_for_status() raises
requests.exceptions.HTTPError(response=resp).
+ Customization:
+ - json_body: controls resp.json() output.
+ - http_error: explicitly set what raise_for_status raises (overrides
default behavior).
+ - raise_for_status: force-enable/disable raising regardless of
status_code.
+ """
+ resp = mock.MagicMock()
+ resp.status_code = status_code
+ resp.reason = reason
+ resp.json.return_value = dict(json_body) if json_body is not None else {}
+
+ _default_should_raise = status_code >= 400
+
+ if raise_for_status is None:
+ should_raise = _default_should_raise
+ else:
+ should_raise = raise_for_status
+
+ if http_error is not None:
+ resp.raise_for_status.side_effect = http_error
+ elif should_raise:
+ resp.raise_for_status.side_effect =
requests.exceptions.HTTPError(response=resp)
+ else:
+ resp.raise_for_status.return_value = None
+
+ return resp
def create_async_request_client_response_error(request_info=None,
history=None, status_code=429):
@@ -203,9 +235,7 @@ def
create_async_request_client_response_error(request_info=None, history=None,
def create_async_connection_error():
- response = mock.MagicMock()
- response.raise_for_status.side_effect = aiohttp.ClientConnectionError()
- return response
+ return aiohttp.ClientConnectionError()
def create_async_request_client_response_success(json=GET_RESPONSE,
status_code=200):
@@ -290,12 +320,14 @@ class TestSnowflakeSqlApiHook:
("sql", "statement_count", "expected_response", "expected_query_ids"),
[(SINGLE_STMT, 1, {"statementHandle": "uuid"}, ["uuid"])],
)
+ @mock.patch(f"{HOOK_PATH}._make_api_call_with_retries")
@mock.patch(f"{HOOK_PATH}._get_conn_params")
@mock.patch(f"{HOOK_PATH}.get_headers")
def test_execute_query_exception_without_statement_handle(
self,
mock_get_header,
mock_conn_param,
+ mock_make_api_call,
sql,
statement_count,
expected_response,
@@ -306,13 +338,12 @@ class TestSnowflakeSqlApiHook:
Test execute_query method by mocking the exception response and raise
airflow exception
without statementHandle in the response
"""
- side_effect = create_post_side_effect()
- mock_requests.request.side_effect = side_effect
+ # status_code, json payload without statementHandle
+ mock_make_api_call.return_value = (None, {"foo": "bar"})
hook = SnowflakeSqlApiHook("mock_conn_id")
- with pytest.raises(AirflowException) as exception_info:
+ with pytest.raises(AirflowException):
hook.execute_query(sql, statement_count)
- assert exception_info
@pytest.mark.parametrize(
("sql", "statement_count", "bindings"),
@@ -358,6 +389,8 @@ class TestSnowflakeSqlApiHook:
params = {"requestId": str(req_id), "page": 2, "pageSize": 10}
mock_geturl_header_params.return_value = HEADERS, params,
"/test/airflow/"
mock_requests.request.return_value.json.return_value = GET_RESPONSE
+ # Make sure status code 200 when query is success.
+ mock_requests.request.return_value.status_code = 200
hook = SnowflakeSqlApiHook("mock_conn_id")
with mock.patch.object(hook.log, "info") as mock_log_info:
hook.check_query_output(query_ids)
@@ -382,7 +415,7 @@ class TestSnowflakeSqlApiHook:
"stop": tenacity.stop_after_attempt(2), # Only 2 attempts instead
of default 5
}
hook = SnowflakeSqlApiHook("mock_conn_id",
api_retry_args=custom_retry_args)
- mock_requests.request.side_effect =
[create_post_side_effect(status_code=500)] * 3
+ mock_requests.request.side_effect =
[create_post_response(status_code=500)] * 3
with pytest.raises(requests.exceptions.HTTPError):
hook.check_query_output(query_ids)
@@ -1514,6 +1547,40 @@ class TestSnowflakeSqlApiHook:
):
hook._make_api_call_with_retries("GET", API_URL, HEADERS)
+ def test_make_api_call_with_422_does_not_raise_for_status(self,
mock_requests):
+ """Test that HTTP 422 responses do not call raise_for_status and pass
through the response body."""
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn")
+
+ response = create_post_response(
+ status_code=422, json_body={"code": "Query was failed when
runtime..", "message": "sync job"}
+ )
+ mock_requests.request.return_value = response
+
+ status, body = hook._make_api_call_with_retries("GET", API_URL,
HEADERS)
+
+ assert status == 422
+ assert body == {"code": "Query was failed when runtime..", "message":
"sync job"}
+
+ # Validate 422 don't raise http error.
+ response.raise_for_status.assert_not_called()
+ # Decode should call once.
+ response.json.assert_called_once()
+
+ def test_make_api_call_with_500_raises_for_status(self, mock_requests):
+ """Test that HTTP 500 responses call raise_for_status and do not pass
through the response body."""
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn")
+
+ response = create_post_response(status_code=500, json_body={"error":
"internal error"})
+ mock_requests.request.return_value = response
+
+ with pytest.raises(requests.exceptions.HTTPError):
+ hook._make_api_call_with_retries("GET", API_URL, HEADERS)
+
+ # 500 status code should raise HTTPError.
+ response.raise_for_status.assert_called_once()
+ # After raise _make_api_call_with_retries will return control
immediately.
+ response.json.assert_not_called()
+
@pytest.mark.asyncio
async def
test_make_api_call_with_retries_async_passes_timeout_to_clientsession(self):
"""
@@ -1611,3 +1678,48 @@ class TestSnowflakeSqlApiHook:
match=r"aiohttp_request_kwargs must not override request identity
fields",
):
await hook._make_api_call_with_retries_async("GET", API_URL,
HEADERS)
+
+ @pytest.mark.asyncio
+ async def
test_make_api_call_with_422_does_not_raise_for_status_async(self,
mock_async_request):
+ """Test that HTTP 422 responses do not call raise_for_status and pass
through the response body (async)."""
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn")
+
+ response = create_async_request_client_response_error(status_code=422)
+
+ # 422 should NOT call raise_for_status! We have to provide a valid
JSON payload
+ response.json = AsyncMock(
+ return_value={"code": "Query was failed when runtime..",
"message": "async job!"}
+ )
+
+ # If raise_for_status() is called, this side effect will explode the
test!
+ response.raise_for_status.side_effect = AssertionError(
+ "raise_for_status should not be called for 422"
+ )
+
+ mock_async_request.__aenter__.return_value = response
+
+ status, body = await hook._make_api_call_with_retries_async("GET",
API_URL, HEADERS)
+
+ assert status == 422
+ assert body == {"code": "Query was failed when runtime..", "message":
"async job!"}
+
+ response.raise_for_status.assert_not_called()
+ response.json.assert_awaited_once()
+
+ @pytest.mark.asyncio
+ async def test_make_api_call_with_500_raises_for_status_async(self,
mock_async_request):
+ """Test that HTTP 500 responses call raise_for_status and do not pass
through the response body (async)."""
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="test_conn")
+
+ response = create_async_request_client_response_error(status_code=500)
+
+ # If json() is called, the test must fail..
+ response.json = AsyncMock(side_effect=AssertionError("json should not
be called on 500"))
+
+ mock_async_request.__aenter__.return_value = response
+
+ with pytest.raises(aiohttp.ClientResponseError):
+ await hook._make_api_call_with_retries_async("GET", API_URL,
HEADERS)
+
+ response.raise_for_status.assert_called_once()
+ response.json.assert_not_called()