This is an automated email from the ASF dual-hosted git repository. xiazcy pushed a commit to branch py-graphson4-enable-tests in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 791b9ccfa7bee26b2ac1d5980d3bc58b41d3f237 Author: Yang Xia <[email protected]> AuthorDate: Wed Oct 30 17:06:55 2024 -0700 change from streaming deserialization to aggregating data and make graphson class naming consistent --- gremlin-python/docker-compose.yml | 3 ++- .../gremlin_python/driver/aiohttp/transport.py | 22 ++++++++++++++- .../python/gremlin_python/driver/connection.py | 8 +++++- .../main/python/gremlin_python/driver/protocol.py | 31 ++++++++++++++++++++++ .../python/gremlin_python/driver/serializer.py | 4 +-- gremlin-python/src/main/python/radish/terrain.py | 2 ++ gremlin-python/src/main/python/tests/conftest.py | 11 ++++++-- .../tests/driver/test_driver_remote_connection.py | 4 +-- 8 files changed, 76 insertions(+), 9 deletions(-) diff --git a/gremlin-python/docker-compose.yml b/gremlin-python/docker-compose.yml index 7fd2c29418..26758e3fa2 100644 --- a/gremlin-python/docker-compose.yml +++ b/gremlin-python/docker-compose.yml @@ -64,7 +64,8 @@ services: && python3 ./setup.py test && python3 ./setup.py install && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' --user-data='bulked=true' - && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0'; + && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.graphbinary-v4.0' + && radish -f dots -e -t -b ./radish ./gremlin-test --user-data='serializer=application/vnd.gremlin-v4.0+json'; EXIT_CODE=$$?; chown -R `stat -c "%u:%g" .` .; exit $$EXIT_CODE" depends_on: gremlin-server-test-python: diff --git a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py index 3a5d1667e2..f7b915f73e 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py @@ -100,7 +100,27 @@ class AiohttpHTTPTransport(AbstractBaseTransport): def read(self, stream_chunk=None): if not stream_chunk: - raise Exception('missing handling of streamed responses to protocol') + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + async def async_read(): + async with async_timeout.timeout(self._read_timeout): + data_buffer = b"" + async for data, end_of_http_chunk in self._http_req_resp.content.iter_chunks(): + try: + data_buffer += data + except ClientPayloadError: + # server disconnect during streaming will cause ClientPayLoadError from aiohttp + raise GremlinServerError({'code': 500, + 'message': 'Server disconnected - please try to reconnect', + 'exception': ClientPayloadError}) + if self._max_content_len and len( + data_buffer) > self._max_content_len: + raise Exception(f'Response size {len(data_buffer)} exceeds limit {self._max_content_len} bytes') + return data_buffer + return self._loop.run_until_complete(async_read()) + # raise Exception('missing handling of streamed responses to protocol') # Inner function to perform async read. async def async_read(): diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py b/gremlin-python/src/main/python/gremlin_python/driver/connection.py index 5ebfb1cf1a..3217778bb4 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py @@ -86,7 +86,13 @@ class Connection: def _receive(self): try: - self._transport.read(self.stream_chunk) + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + self._protocol.data_received_aggregate(self._transport.read(), self._result_set) + # re-enable streaming after graphSON removal + # self._transport.read(self.stream_chunk) finally: self._pool.put_nowait(self) diff --git a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py index 930d2de328..48c709de17 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py @@ -95,6 +95,37 @@ class GremlinServerHTTPProtocol(AbstractBaseProtocol): self._transport.write(message) + ''' + GraphSON does not support streaming deserialization, we are aggregating data and bypassing streamed + deserialization while GraphSON is enabled for testing. Remove after GraphSON is removed. + ''' + def data_received_aggregate(self, response, result_set): + response_msg = {'status': {'code': 0, + 'message': '', + 'exception': ''}, + 'result': {'meta': {}, + 'data': []}} + + response_msg = self._decode_chunk(response_msg, response, self._is_first_chunk) + + self._is_first_chunk = False + status_code = response_msg['status']['code'] + aggregate_to = response_msg['result']['meta'].get('aggregateTo', 'list') + data = response_msg['result']['data'] + result_set.aggregate_to = aggregate_to + self._is_first_chunk = True + + if status_code == 204 and len(data) == 0: + result_set.stream.put_nowait([]) + elif status_code in [200, 204, 206]: + result_set.stream.put_nowait(data) + else: + log.error("\r\nReceived error message '%s'\r\n\r\nWith result set '%s'", + str(self._response_msg), str(result_set)) + raise GremlinServerError({'code': status_code, + 'message': self._response_msg['status']['message'], + 'exception': self._response_msg['status']['exception']}) + # data is received in chunks def data_received(self, response_chunk, result_set, read_completed=None, http_req_resp=None): # we shouldn't need to use the http_req_resp code as status is sent in response message, but leaving it for now diff --git a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py index 8e207ae986..aa4c70ac7b 100644 --- a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py +++ b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py @@ -40,7 +40,7 @@ __author__ = 'David M. Brown ([email protected]), Lyndon Bauto (lyndonb@bitqui GraphSONV4 """ -class GraphSONSerializerV4(object): +class GraphSONSerializersV4(object): """ Message serializer for GraphSON. Allow users to pass custom reader, writer, and version kwargs for custom serialization. Otherwise, @@ -89,7 +89,7 @@ class GraphSONSerializerV4(object): msg = json.loads(message if isinstance(message, str) else message.decode('utf-8')) return self._graphson_reader.to_object(msg) else: - # graphSON does not stream, the first chunk contains all info + # graphSON does not stream, all results are aggregated inside the first chunk return "" """ diff --git a/gremlin-python/src/main/python/radish/terrain.py b/gremlin-python/src/main/python/radish/terrain.py index c54334ab0c..f341106467 100644 --- a/gremlin-python/src/main/python/radish/terrain.py +++ b/gremlin-python/src/main/python/radish/terrain.py @@ -96,6 +96,8 @@ def __create_remote(server_graph_name): if world.config.user_data["serializer"] == "application/vnd.graphbinary-v4.0": s = serializer.GraphBinarySerializersV4() + elif world.config.user_data["serializer"] == "application/vnd.gremlin-v4.0+json": + s = serializer.GraphSONSerializersV4() else: raise ValueError('serializer not found - ' + world.config.user_data["serializer"]) diff --git a/gremlin-python/src/main/python/tests/conftest.py b/gremlin-python/src/main/python/tests/conftest.py index 0aafc43d1b..ff6a0c229d 100644 --- a/gremlin-python/src/main/python/tests/conftest.py +++ b/gremlin-python/src/main/python/tests/conftest.py @@ -25,6 +25,7 @@ import pytest import logging import queue +from gremlin_python.driver import serializer from gremlin_python.driver.client import Client from gremlin_python.driver.connection import Connection from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection @@ -110,11 +111,17 @@ def graphbinary_serializer_v4(request): return GraphBinarySerializersV4() [email protected](params=['graphbinaryv4']) [email protected](params=['graphbinaryv4','graphsonv4']) def remote_connection(request): try: if request.param == 'graphbinaryv4': - remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern') + remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern', + request_serializer=serializer.GraphBinarySerializersV4(), + response_serializer=serializer.GraphBinarySerializersV4() ) + elif request.param == 'graphsonv4': + remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern', + request_serializer=serializer.GraphSONSerializersV4(), + response_serializer=serializer.GraphSONSerializersV4()) else: raise ValueError("Invalid serializer option - " + request.param) except OSError: diff --git a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py index 87a84c766e..77e4c9cd76 100644 --- a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py +++ b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py @@ -40,8 +40,8 @@ class TestDriverRemoteConnection(object): # in conftest.py and remove this def test_graphSONV4_temp(self): remote_conn = DriverRemoteConnection(test_no_auth_url, 'gmodern', - request_serializer=serializer.GraphSONSerializerV4(), - response_serializer=serializer.GraphSONSerializerV4()) + request_serializer=serializer.GraphSONSerializersV4(), + response_serializer=serializer.GraphSONSerializersV4()) g = traversal().with_(remote_conn) assert long(6) == g.V().count().to_list()[0] # #
