This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aee7ad27dec Fix Worker crash on API Server Error (#48517)
aee7ad27dec is described below
commit aee7ad27dec9ad62a305e9e827d6df9c180b3031
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 29 04:53:08 2025 +0530
Fix Worker crash on API Server Error (#48517)
closes https://github.com/apache/airflow/issues/47873
closes https://github.com/apache/airflow/issues/48503
---
task-sdk/src/airflow/sdk/api/client.py | 4 +++
task-sdk/tests/task_sdk/api/test_client.py | 54 ++++++++++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index a6d757101fb..3b76d2e5ce6 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -598,6 +598,10 @@ class ServerResponseError(httpx.HTTPStatusError):
detail: list[RemoteValidationError] | str | dict[str, Any] | None
+ def __reduce__(self) -> tuple[Any, ...]:
+ # Needed because https://github.com/encode/httpx/pull/3108 isn't
merged yet.
+ return Exception.__new__, (type(self),) + self.args, self.__dict__
+
@classmethod
def from_response(cls, response: httpx.Response) -> ServerResponseError |
None:
if response.is_success:
diff --git a/task-sdk/tests/task_sdk/api/test_client.py
b/task-sdk/tests/task_sdk/api/test_client.py
index 409c140972d..36cf96781e6 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import json
+import pickle
from unittest import mock
import httpx
@@ -108,6 +109,29 @@ class TestClient:
assert err.value.args == ("Not found",)
assert err.value.detail is None
+ def test_server_response_error_pickling(self):
+ responses = [httpx.Response(404, json={"detail": {"message": "Invalid
input"}})]
+ client = make_client_w_responses(responses)
+
+ with pytest.raises(ServerResponseError) as exc_info:
+ client.get("http://error")
+
+ err = exc_info.value
+ assert err.args == ("Server returned error",)
+ assert err.detail == {"detail": {"message": "Invalid input"}}
+
+ # Check that the error is picklable
+ pickled = pickle.dumps(err)
+ unpickled = pickle.loads(pickled)
+
+ assert isinstance(unpickled, ServerResponseError)
+
+ # Test that unpickled error has the same attributes as the original
+ assert unpickled.response.json() == {"detail": {"message": "Invalid
input"}}
+ assert unpickled.detail == {"detail": {"message": "Invalid input"}}
+ assert unpickled.response.status_code == 404
+ assert unpickled.request.url == "http://error"
+
@mock.patch("time.sleep", return_value=None)
def test_retry_handling_unrecoverable_error(self, mock_sleep):
responses: list[httpx.Response] = [
@@ -865,3 +889,33 @@ class TestTaskRescheduleOperations:
assert isinstance(result, TaskRescheduleStartDate)
assert result.start_date == "2024-01-01T00:00:00Z"
+
+
+# def test_server_response_error_pickling():
+# """Test that ServerResponseError can be pickled and unpickled."""
+# def handle_request(request: httpx.Request) -> httpx.Response:
+# return httpx.Response(
+# status_code=400,
+# request=request,
+# json={"detail": {"message": "Invalid input"}},
+# )
+#
+# client = make_client(transport=httpx.MockTransport(handle_request))
+#
+# # Create an error by making a request that will fail
+# with pytest.raises(ServerResponseError) as exc_info:
+# client.get("http://fake-url-for-testing")
+#
+# err = exc_info.value
+#
+# # Pickle and unpickle
+# pickled = pickle.dumps(err)
+# unpickled = pickle.loads(pickled)
+#
+# assert isinstance(unpickled, ServerResponseError)
+#
+# # Error message from response
+# assert unpickled.response.json() == {"detail": {"message": "Invalid
input"}}
+# assert unpickled.detail == {"detail": {"message": "Invalid input"}}
+# assert unpickled.response.status_code == 400
+# assert unpickled.request.url == "http://fake-url-for-testing"