This is an automated email from the ASF dual-hosted git repository. Cole-Greer pushed a commit to branch GLVBehaviouralAlignment in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 41aca1ea4409de5f0baea8b3106ca14600aba834 Author: Cole Greer <[email protected]> AuthorDate: Tue Jun 2 12:37:34 2026 -0700 Evict dead connections and wrap transport errors in gremlin-python - Release the aiohttp response on error so a half-closed connection is evicted and the same client recovers after an empty response body (tinkerpop-sti). - Wrap low-level aiohttp transport errors in GremlinConnectionError with an actionable message on both the request and response paths (tinkerpop-1fn). - Use a distinct "Server returned an empty response body" message for the empty-body case (tinkerpop-7s3). - Tighten behavioral test assertions. --- .../gremlin_python/driver/aiohttp/transport.py | 6 ++++ .../python/gremlin_python/driver/connection.py | 33 ++++++++++++++++++++++ .../integration/driver/test_client_behavior.py | 21 +++++++------- 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py index 08a2462427..03da916ca7 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py @@ -171,6 +171,12 @@ class AiohttpHTTPTransport: return await self._http_req_resp.read() return self._loop.run_until_complete(_read()) + def release_response(self): + """Release the current HTTP response, returning its connection to aiohttp's pool.""" + if self._http_req_resp is not None: + self._http_req_resp.close() + self._http_req_resp = None + def close(self): # Inner function to perform async close. async def async_close(): diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py b/gremlin-python/src/main/python/gremlin_python/driver/connection.py index 64a3f00081..f835476953 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py @@ -14,14 +14,28 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import asyncio import queue from concurrent.futures import Future +from aiohttp.client_exceptions import ( + ClientConnectionError, + ClientPayloadError, + ServerDisconnectedError, +) + from gremlin_python.driver import resultset, useragent from gremlin_python.driver.aiohttp.transport import AiohttpHTTPTransport __author__ = 'David M. Brown ([email protected])' +_TRANSPORT_ERRORS = (ClientConnectionError, ClientPayloadError, ServerDisconnectedError, asyncio.IncompleteReadError) + +_CONNECTION_ERROR_MSG = ( + "Connection to server closed unexpectedly. " + "Ensure that the server is still reachable and the connection has not been closed by the server or a network device." +) + class GremlinServerError(Exception): def __init__(self, status): @@ -31,6 +45,11 @@ class GremlinServerError(Exception): self.status_exception = status['exception'] +class GremlinConnectionError(Exception): + """Raised when a transport-level failure occurs communicating with the server.""" + pass + + class Connection: def __init__(self, url, traversal_source, @@ -108,6 +127,11 @@ class Connection: def cb(f): try: f.result() + except _TRANSPORT_ERRORS as e: + wrapped = GremlinConnectionError(_CONNECTION_ERROR_MSG) + wrapped.__cause__ = e + future.set_exception(wrapped) + self._pool.put_nowait(self) except Exception as e: future.set_exception(e) self._pool.put_nowait(self) @@ -145,6 +169,15 @@ class Connection: stream = self._transport.get_stream() for obj in self._response_serializer.deserialize_response_stream(stream): self._result_set.stream.put_nowait(obj) + except _TRANSPORT_ERRORS as err: + # Release the response to evict the dead connection from aiohttp's + # internal pool, ensuring subsequent requests get a fresh connection. + self._transport.release_response() + msg = 'Server returned an empty response body' if isinstance(err, asyncio.IncompleteReadError) and not err.partial else _CONNECTION_ERROR_MSG + raise GremlinConnectionError(msg) from err + except Exception: + self._transport.release_response() + raise finally: self._pool.put_nowait(self) diff --git a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py index 149f2735f4..7c9787e33d 100644 --- a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py +++ b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py @@ -23,10 +23,9 @@ import time from concurrent.futures import ThreadPoolExecutor, as_completed import pytest -from aiohttp.client_exceptions import ClientPayloadError, ServerDisconnectedError from gremlin_python.driver.client import Client -from gremlin_python.driver.connection import GremlinServerError +from gremlin_python.driver.connection import GremlinConnectionError, GremlinServerError from gremlin_python.driver.serializer import GraphBinarySerializersV4 from .socket_server_constants import ( @@ -70,7 +69,7 @@ def test_should_receive_single_vertex(socket_server_client): def test_should_handle_server_closing_connection_before_response(socket_server_client): - with pytest.raises(ServerDisconnectedError, match="Server disconnected"): + with pytest.raises(GremlinConnectionError, match="Connection to server closed unexpectedly"): socket_server_client.submit(GREMLIN_CLOSE_CONNECTION).all().result() # Recovery @@ -97,7 +96,7 @@ def test_should_handle_server_error_after_delay(socket_server_client): def test_should_handle_partial_content_close(socket_server_client): - with pytest.raises(ClientPayloadError, match="payload is not completed"): + with pytest.raises(GremlinConnectionError, match="Connection to server closed unexpectedly"): socket_server_client.submit(GREMLIN_PARTIAL_CONTENT_CLOSE).all().result() # Recovery @@ -117,15 +116,15 @@ def test_should_handle_malformed_response(socket_server_client): def test_should_handle_empty_response_body(fresh_client): - # An empty HTTP response body should surface as an error rather than hang. - with pytest.raises(asyncio.IncompleteReadError): + # An empty HTTP response body should surface as a GremlinConnectionError + # wrapping the underlying IncompleteReadError. + with pytest.raises(GremlinConnectionError, match="Server returned an empty response body"): fresh_client.submit(GREMLIN_EMPTY_BODY).all().result() - # NOTE: Unlike the Java driver, the Python (aiohttp) driver does not recover - # on the same client after an empty response body - the half-closed connection - # is not evicted from the pool and a subsequent request fails with - # 'Cannot write to closing transport'. This driver gap is flagged in the - # cross-GLV error-message audit (tinkerpop-8lw.6) for further consideration. + # Recovery on the same client - the dead connection should be evicted from + # aiohttp's internal pool so subsequent requests get a fresh connection. + result = fresh_client.submit(GREMLIN_SINGLE_VERTEX).all().result() + assert len(result) == 1 def test_should_handle_slow_response(socket_server_client):
