This is an automated email from the ASF dual-hosted git repository.
xiazcy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new 8e7f8fbf92 Add byte buffer to improve gremlin-python performance
(#3422)
8e7f8fbf92 is described below
commit 8e7f8fbf922ebb23d110594627215113d5708ca5
Author: Guian Gumpac <[email protected]>
AuthorDate: Wed May 13 14:29:57 2026 -0700
Add byte buffer to improve gremlin-python performance (#3422)
Add byte buffer to improve performance
Assisted-by: Devin: Claude Opus 4.7
---
.../gremlin_python/driver/aiohttp/transport.py | 35 +++++++++++++++-
.../tests/unit/driver/test_http_streaming.py | 47 +++++++++++++++-------
2 files changed, 66 insertions(+), 16 deletions(-)
diff --git
a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
index 1a992342bd..08a2462427 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
@@ -30,17 +30,48 @@ __author__ = 'Lyndon Bauto ([email protected])'
class AiohttpSyncStream:
"""Wraps aiohttp's async StreamReader as a synchronous file-like object.
- read(n) blocks until exactly n bytes are available from the HTTP
response."""
+ read(n) blocks until exactly n bytes are available from the HTTP response.
+
+ Maintains an internal byte buffer that is refilled one HTTP chunk at a time
+ so the deserializer's many small read(n) calls don't each pay the cost of a
+ full asyncio event-loop turn."""
+
+ # Max bytes pulled from the response per underlying read. Matches
+ # aiohttp.StreamReader's default 64 KB limit, which is the per-connection
+ # high-water mark, asking for more in one read() never returns more.
+ _FILL_SIZE = 64 * 1024
def __init__(self, response, loop, read_timeout):
self._response = response
self._loop = loop
self._read_timeout = read_timeout
+ self._buf = bytearray()
+ self._pos = 0
def read(self, n):
+ if n <= 0:
+ return b''
+ while len(self._buf) - self._pos < n:
+ data = self._read_chunk()
+ if not data:
+ partial = bytes(self._buf[self._pos:])
+ self._buf.clear()
+ self._pos = 0
+ raise asyncio.IncompleteReadError(partial=partial, expected=n)
+ self._buf.extend(data)
+ end = self._pos + n
+ out = bytes(self._buf[self._pos:end])
+ self._pos = end
+ # Reclaim memory once the buffer is fully drained
+ if self._pos == len(self._buf):
+ self._buf.clear()
+ self._pos = 0
+ return out
+
+ def _read_chunk(self):
async def _read():
async with async_timeout.timeout(self._read_timeout):
- return await self._response.content.readexactly(n)
+ return await self._response.content.read(self._FILL_SIZE)
return self._loop.run_until_complete(_read())
diff --git
a/gremlin-python/src/main/python/tests/unit/driver/test_http_streaming.py
b/gremlin-python/src/main/python/tests/unit/driver/test_http_streaming.py
index c774df7406..23415fdef3 100644
--- a/gremlin-python/src/main/python/tests/unit/driver/test_http_streaming.py
+++ b/gremlin-python/src/main/python/tests/unit/driver/test_http_streaming.py
@@ -157,7 +157,9 @@ class TestAiohttpSyncStream:
The class should:
- Have a read(n) method that blocks until exactly n bytes are available
- - Bridge async readexactly(n) to sync via loop.run_until_complete()
+ - Refill its internal buffer one HTTP chunk at a time so the
+ deserializer's many small read(n) calls don't each cost a full
+ asyncio event-loop turn
- Raise on timeout
- Raise asyncio.IncompleteReadError on premature disconnect
"""
@@ -169,13 +171,12 @@ class TestAiohttpSyncStream:
loop = asyncio.new_event_loop()
mock_response = MagicMock()
mock_response.content = MagicMock()
- mock_response.content.readexactly =
AsyncMock(return_value=b'\x01\x02\x03\x04')
+ mock_response.content.read =
AsyncMock(side_effect=[b'\x01\x02\x03\x04', b''])
stream = AiohttpSyncStream(mock_response, loop, read_timeout=30)
result = stream.read(4)
assert result == b'\x01\x02\x03\x04'
- mock_response.content.readexactly.assert_awaited_once_with(4)
loop.close()
def test_read_single_byte(self):
@@ -185,7 +186,7 @@ class TestAiohttpSyncStream:
loop = asyncio.new_event_loop()
mock_response = MagicMock()
mock_response.content = MagicMock()
- mock_response.content.readexactly = AsyncMock(return_value=b'\x84')
+ mock_response.content.read = AsyncMock(side_effect=[b'\x84', b''])
stream = AiohttpSyncStream(mock_response, loop, read_timeout=30)
result = stream.read(1)
@@ -194,35 +195,53 @@ class TestAiohttpSyncStream:
loop.close()
def test_read_multiple_sequential_calls(self):
- """Multiple read() calls should each invoke readexactly
independently."""
+ """Multiple read() calls should be served from a single buffered
chunk."""
from gremlin_python.driver.aiohttp.transport import AiohttpSyncStream
loop = asyncio.new_event_loop()
mock_response = MagicMock()
mock_response.content = MagicMock()
- mock_response.content.readexactly = AsyncMock(side_effect=[b'\x84',
b'\x00', b'\x01\x02\x03\x04'])
+ # The whole payload arrives in one chunk; subsequent calls return EOF.
+ mock_response.content.read =
AsyncMock(side_effect=[b'\x84\x00\x01\x02\x03\x04', b''])
stream = AiohttpSyncStream(mock_response, loop, read_timeout=30)
assert stream.read(1) == b'\x84'
assert stream.read(1) == b'\x00'
assert stream.read(4) == b'\x01\x02\x03\x04'
- assert mock_response.content.readexactly.await_count == 3
+ # Only one underlying read was needed for three user-level read() calls
+ assert mock_response.content.read.await_count == 1
+ loop.close()
+
+ def test_read_refills_buffer_across_chunks(self):
+ """read(n) should refill from the underlying stream when the buffer is
short."""
+ from gremlin_python.driver.aiohttp.transport import AiohttpSyncStream
+
+ loop = asyncio.new_event_loop()
+ mock_response = MagicMock()
+ mock_response.content = MagicMock()
+ # Data arrives in two chunks; read(6) must span both.
+ mock_response.content.read = AsyncMock(side_effect=[b'\x01\x02\x03',
b'\x04\x05\x06', b''])
+
+ stream = AiohttpSyncStream(mock_response, loop, read_timeout=30)
+ assert stream.read(6) == b'\x01\x02\x03\x04\x05\x06'
+ assert mock_response.content.read.await_count == 2
loop.close()
def test_read_raises_on_incomplete_read(self):
- """read() should propagate IncompleteReadError when server disconnects
mid-stream."""
+ """read() should raise IncompleteReadError when the server disconnects
mid-stream."""
from gremlin_python.driver.aiohttp.transport import AiohttpSyncStream
loop = asyncio.new_event_loop()
mock_response = MagicMock()
mock_response.content = MagicMock()
- mock_response.content.readexactly = AsyncMock(
- side_effect=asyncio.IncompleteReadError(partial=b'\x01',
expected=4)
- )
+ # First chunk delivers one byte, then EOF — caller asked for four.
+ mock_response.content.read = AsyncMock(side_effect=[b'\x01', b''])
stream = AiohttpSyncStream(mock_response, loop, read_timeout=30)
- with pytest.raises(asyncio.IncompleteReadError):
+ with pytest.raises(asyncio.IncompleteReadError) as exc_info:
stream.read(4)
+ assert exc_info.value.partial == b'\x01'
+ assert exc_info.value.expected == 4
loop.close()
def test_read_raises_on_timeout(self):
@@ -232,7 +251,7 @@ class TestAiohttpSyncStream:
loop = asyncio.new_event_loop()
mock_response = MagicMock()
mock_response.content = MagicMock()
- mock_response.content.readexactly =
AsyncMock(side_effect=asyncio.TimeoutError())
+ mock_response.content.read =
AsyncMock(side_effect=asyncio.TimeoutError())
stream = AiohttpSyncStream(mock_response, loop, read_timeout=1)
with pytest.raises(asyncio.TimeoutError):
@@ -268,7 +287,7 @@ class TestTransportGetStream:
transport._read_timeout = 30
mock_resp = MagicMock()
mock_resp.content = MagicMock()
- mock_resp.content.readexactly = AsyncMock(return_value=b'\x84')
+ mock_resp.content.read = AsyncMock(side_effect=[b'\x84', b''])
transport._http_req_resp = mock_resp
stream = transport.get_stream()