This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aee7ad27dec Fix Worker crash on API Server Error (#48517)
aee7ad27dec is described below

commit aee7ad27dec9ad62a305e9e827d6df9c180b3031
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Mar 29 04:53:08 2025 +0530

    Fix Worker crash on API Server Error (#48517)
    
    closes https://github.com/apache/airflow/issues/47873
    
    closes https://github.com/apache/airflow/issues/48503
---
 task-sdk/src/airflow/sdk/api/client.py     |  4 +++
 task-sdk/tests/task_sdk/api/test_client.py | 54 ++++++++++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index a6d757101fb..3b76d2e5ce6 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -598,6 +598,10 @@ class ServerResponseError(httpx.HTTPStatusError):
 
     detail: list[RemoteValidationError] | str | dict[str, Any] | None
 
+    def __reduce__(self) -> tuple[Any, ...]:
+        # Needed because https://github.com/encode/httpx/pull/3108 isn't 
merged yet.
+        return Exception.__new__, (type(self),) + self.args, self.__dict__
+
     @classmethod
     def from_response(cls, response: httpx.Response) -> ServerResponseError | 
None:
         if response.is_success:
diff --git a/task-sdk/tests/task_sdk/api/test_client.py 
b/task-sdk/tests/task_sdk/api/test_client.py
index 409c140972d..36cf96781e6 100644
--- a/task-sdk/tests/task_sdk/api/test_client.py
+++ b/task-sdk/tests/task_sdk/api/test_client.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import json
+import pickle
 from unittest import mock
 
 import httpx
@@ -108,6 +109,29 @@ class TestClient:
         assert err.value.args == ("Not found",)
         assert err.value.detail is None
 
+    def test_server_response_error_pickling(self):
+        responses = [httpx.Response(404, json={"detail": {"message": "Invalid 
input"}})]
+        client = make_client_w_responses(responses)
+
+        with pytest.raises(ServerResponseError) as exc_info:
+            client.get("http://error";)
+
+        err = exc_info.value
+        assert err.args == ("Server returned error",)
+        assert err.detail == {"detail": {"message": "Invalid input"}}
+
+        # Check that the error is picklable
+        pickled = pickle.dumps(err)
+        unpickled = pickle.loads(pickled)
+
+        assert isinstance(unpickled, ServerResponseError)
+
+        # Test that unpickled error has the same attributes as the original
+        assert unpickled.response.json() == {"detail": {"message": "Invalid 
input"}}
+        assert unpickled.detail == {"detail": {"message": "Invalid input"}}
+        assert unpickled.response.status_code == 404
+        assert unpickled.request.url == "http://error";
+
     @mock.patch("time.sleep", return_value=None)
     def test_retry_handling_unrecoverable_error(self, mock_sleep):
         responses: list[httpx.Response] = [
@@ -865,3 +889,33 @@ class TestTaskRescheduleOperations:
 
         assert isinstance(result, TaskRescheduleStartDate)
         assert result.start_date == "2024-01-01T00:00:00Z"
+
+
+# def test_server_response_error_pickling():
+#     """Test that ServerResponseError can be pickled and unpickled."""
+#     def handle_request(request: httpx.Request) -> httpx.Response:
+#         return httpx.Response(
+#             status_code=400,
+#             request=request,
+#             json={"detail": {"message": "Invalid input"}},
+#         )
+#
+#     client = make_client(transport=httpx.MockTransport(handle_request))
+#
+#     # Create an error by making a request that will fail
+#     with pytest.raises(ServerResponseError) as exc_info:
+#         client.get("http://fake-url-for-testing";)
+#
+#     err = exc_info.value
+#
+#     # Pickle and unpickle
+#     pickled = pickle.dumps(err)
+#     unpickled = pickle.loads(pickled)
+#
+#     assert isinstance(unpickled, ServerResponseError)
+#
+#     # Error message from response
+#     assert unpickled.response.json() == {"detail": {"message": "Invalid 
input"}}
+#     assert unpickled.detail == {"detail": {"message": "Invalid input"}}
+#     assert unpickled.response.status_code == 400
+#     assert unpickled.request.url == "http://fake-url-for-testing";

Reply via email to