This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-http-final in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 4fa8a353efd419b808487b6003fee44da18e8e22 Author: Yang Xia <55853655+xia...@users.noreply.github.com> AuthorDate: Thu Oct 31 16:28:05 2024 -0700 change from streaming deserialization to aggregating data and make graphson class naming consistent (#2873) --- gremlin-python/docker-compose.yml | 3 ++- .../gremlin_python/driver/aiohttp/transport.py | 22 ++++++++++++++- .../python/gremlin_python/driver/connection.py | 8 +++++- .../main/python/gremlin_python/driver/protocol.py | 31 ++++++++++++++++++++++ .../python/gremlin_python/driver/serializer.py | 4 +-- gremlin-python/src/main/python/radish/terrain.py | 2 ++ gremlin-python/src/main/python/tests/conftest.py | 11 ++++++-- .../tests/driver/test_driver_remote_connection.py | 4 +-- 8 files changed, 76 insertions(+), 9 deletions(-) diff --git a/gremlin-python/docker-compose.yml b/gremlin-python/docker-compose.yml index f302c54823..b912c907cc 100644 --- a/gremlin-python/docker-compose.yml +++ b/gremlin-python/docker-compose.yml @@ -64,7 +64,8 @@ services: && python3 ./setup.py test && python3 ./setup.py install && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' --user-data='bulked=true' - && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0'; + && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' + && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.gremlin-v4.0+json'; EXIT_CODE=$$?; chown -R `stat -c "%u:%g" .` .; exit $$EXIT_CODE" depends_on: gremlin-server-test-python: diff --git a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py index b2827e9085..399f6806b8 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py @@ -105,7 +105,27 @@ class AiohttpHTTPTransport(AbstractBaseTransport): def read(self, stream_chunk=None): if not stream_chunk: - raise Exception('missing handling of streamed responses to protocol') + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + async def async_read(): + async with async_timeout.timeout(self._read_timeout): + data_buffer = b"" + async for data, end_of_http_chunk in self._http_req_resp.content.iter_chunks(): + try: + data_buffer += data + except ClientPayloadError: + # server disconnect during streaming will cause ClientPayLoadError from aiohttp + raise GremlinServerError({'code': 500, + 'message': 'Server disconnected - please try to reconnect', + 'exception': ClientPayloadError}) + if self._max_content_len and len( + data_buffer) > self._max_content_len: + raise Exception(f'Response size {len(data_buffer)} exceeds limit {self._max_content_len} bytes') + return data_buffer + return self._loop.run_until_complete(async_read()) + # raise Exception('missing handling of streamed responses to protocol') # Inner function to perform async read. async def async_read(): diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py b/gremlin-python/src/main/python/gremlin_python/driver/connection.py index cb34923d0c..434272c5c5 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py @@ -84,7 +84,13 @@ class Connection: def _receive(self): try: - self._transport.read(self.stream_chunk) + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + self._protocol.data_received_aggregate(self._transport.read(), self._result_set) + # re-enable streaming after graphSON removal + # self._transport.read(self.stream_chunk) finally: self._pool.put_nowait(self) diff --git a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py index 930d2de328..48c709de17 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py @@ -95,6 +95,37 @@ class GremlinServerHTTPProtocol(AbstractBaseProtocol): self._transport.write(message) + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + def data_received_aggregate(self, response, result_set): + response_msg = {'status': {'code': 0, + 'message': '', + 'exception': ''}, + 'result': {'meta': {}, + 'data': []}} + + response_msg = self._decode_chunk(response_msg, response, self._is_first_chunk) + + self._is_first_chunk = False + status_code = response_msg['status']['code'] + aggregate_to = response_msg['result']['meta'].get('aggregateTo', 'list') + data = response_msg['result']['data'] + result_set.aggregate_to = aggregate_to + self._is_first_chunk = True + + if status_code == 204 and len(data) == 0: + result_set.stream.put_nowait([]) + elif status_code in [200, 204, 206]: + result_set.stream.put_nowait(data) + else: + log.error("\r\nReceived error message '%s'\r\n\r\nWith result set '%s'", + str(self._response_msg), str(result_set)) + raise GremlinServerError({'code': status_code, + 'message': self._response_msg['status']['message'], + 'exception': self._response_msg['status']['exception']}) + # data is received in chunks def data_received(self, response_chunk, result_set, read_completed=None, http_req_resp=None): # we shouldn't need to use the http_req_resp code as status is sent in response message, but leaving it for now diff --git a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py index 8e207ae986..aa4c70ac7b 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py @@ -40,7 +40,7 @@ __author__ = 'David M. Brown (davebs...@gmail.com), Lyndon Bauto (lyndonb@bitqui GraphSONV4 """ -class GraphSONSerializerV4(object): +class GraphSONSerializersV4(object): """ Message serializer for GraphSON. Allow users to pass custom reader, writer, and version kwargs for custom serialization. Otherwise, @@ -89,7 +89,7 @@ class GraphSONSerializerV4(object): msg = json.loads(message if isinstance(message, str) else message.decode('utf-8')) return self._graphson_reader.to_object(msg) else: - # graphSON does not stream, the first chunk contains all info + # graphSON does not stream, all results are aggregated inside the first chunk return "" """ diff --git a/gremlin-python/src/main/python/radish/terrain.py b/gremlin-python/src/main/python/radish/terrain.py index c54334ab0c..f341106467 100644 --- a/gremlin-python/src/main/python/radish/terrain.py +++ b/gremlin-python/src/main/python/radish/terrain.py @@ -96,6 +96,8 @@ def __create_remote(server_graph_name): if world.config.user_data["serializer"] == "application/vnd.graphbinary-v4.0": s = serializer.GraphBinarySerializersV4() + elif world.config.user_data["serializer"] == "application/vnd.gremlin-v4.0+json": + s = serializer.GraphSONSerializersV4() else: raise ValueError('serializer not found - ' + world.config.user_data["serializer"]) diff --git a/gremlin-python/src/main/python/tests/conftest.py b/gremlin-python/src/main/python/tests/conftest.py index 0aafc43d1b..cc958c7c5d 100644 --- a/gremlin-python/src/main/python/tests/conftest.py +++ b/gremlin-python/src/main/python/tests/conftest.py @@ -25,6 +25,7 @@ import pytest import logging import queue +from gremlin_python.driver import serializer from gremlin_python.driver.client import Client from gremlin_python.driver.connection import Connection from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection @@ -110,11 +111,17 @@ def graphbinary_serializer_v4(request): return GraphBinarySerializersV4() -@pytest.fixture(params=['graphbinaryv4']) +@pytest.fixture(params=['graphbinaryv4','graphsonv4']) def remote_connection(request): try: if request.param == 'graphbinaryv4': - remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern') + remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern', + request_serializer=serializer.GraphBinarySerializersV4(), + response_serializer=serializer.GraphBinarySerializersV4()) + elif request.param == 'graphsonv4': + remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern', + request_serializer=serializer.GraphSONSerializersV4(), + response_serializer=serializer.GraphSONSerializersV4()) else: raise ValueError("Invalid serializer option - " + request.param) except OSError: diff --git a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py index 95004076c4..86d6445316 100644 --- a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py +++ b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py @@ -40,8 +40,8 @@ class TestDriverRemoteConnection(object): # in conftest.py and remove this def test_graphSONV4_temp(self): remote_conn = DriverRemoteConnection(test_no_auth_url, 'gmodern', - request_serializer=serializer.GraphSONSerializerV4(), - response_serializer=serializer.GraphSONSerializerV4()) + request_serializer=serializer.GraphSONSerializersV4(), + response_serializer=serializer.GraphSONSerializersV4()) g = traversal().with_(remote_conn) assert long(6) == g.V().count().to_list()[0] # #