This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-http-final
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 4fa8a353efd419b808487b6003fee44da18e8e22
Author: Yang Xia <55853655+xia...@users.noreply.github.com>
AuthorDate: Thu Oct 31 16:28:05 2024 -0700

    change from streaming deserialization to aggregating data and make graphson 
class naming consistent (#2873)
---
 gremlin-python/docker-compose.yml                  |  3 ++-
 .../gremlin_python/driver/aiohttp/transport.py     | 22 ++++++++++++++-
 .../python/gremlin_python/driver/connection.py     |  8 +++++-
 .../main/python/gremlin_python/driver/protocol.py  | 31 ++++++++++++++++++++++
 .../python/gremlin_python/driver/serializer.py     |  4 +--
 gremlin-python/src/main/python/radish/terrain.py   |  2 ++
 gremlin-python/src/main/python/tests/conftest.py   | 11 ++++++--
 .../tests/driver/test_driver_remote_connection.py  |  4 +--
 8 files changed, 76 insertions(+), 9 deletions(-)

diff --git a/gremlin-python/docker-compose.yml 
b/gremlin-python/docker-compose.yml
index f302c54823..b912c907cc 100644
--- a/gremlin-python/docker-compose.yml
+++ b/gremlin-python/docker-compose.yml
@@ -64,7 +64,8 @@ services:
       && python3 ./setup.py test
       && python3 ./setup.py install
       && radish -f dots -e -t -b ./radish ./gremlin-test 
--user-data='serializer=application/vnd.graphbinary-v4.0' 
--user-data='bulked=true'
-      && radish -f dots -e -t -b ./radish ./gremlin-test 
--user-data='serializer=application/vnd.graphbinary-v4.0';
+      && radish -f dots -e -t -b ./radish ./gremlin-test 
--user-data='serializer=application/vnd.graphbinary-v4.0'
+      && radish -f dots -e -t -b ./radish ./gremlin-test 
--user-data='serializer=application/vnd.gremlin-v4.0+json';
       EXIT_CODE=$$?; chown -R `stat -c "%u:%g" .` .; exit $$EXIT_CODE"
     depends_on:
       gremlin-server-test-python:
diff --git 
a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py 
b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
index b2827e9085..399f6806b8 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
@@ -105,7 +105,27 @@ class AiohttpHTTPTransport(AbstractBaseTransport):
 
     def read(self, stream_chunk=None):
         if not stream_chunk:
-            raise Exception('missing handling of streamed responses to 
protocol')
+            '''
+            GraphSON does not support streaming deserialization, we are 
aggregating data and bypassing streamed
+             deserialization while GraphSON is enabled for testing. Remove 
after GraphSON is removed.
+            '''
+            async def async_read():
+                async with async_timeout.timeout(self._read_timeout):
+                    data_buffer = b""
+                    async for data, end_of_http_chunk in 
self._http_req_resp.content.iter_chunks():
+                        try:
+                            data_buffer += data
+                        except ClientPayloadError:
+                            # server disconnect during streaming will cause 
ClientPayLoadError from aiohttp
+                            raise GremlinServerError({'code': 500,
+                                                      'message': 'Server 
disconnected - please try to reconnect',
+                                                      'exception': 
ClientPayloadError})
+                    if self._max_content_len and len(
+                            data_buffer) > self._max_content_len:
+                        raise Exception(f'Response size {len(data_buffer)} 
exceeds limit {self._max_content_len} bytes')
+                    return data_buffer
+            return self._loop.run_until_complete(async_read())
+            # raise Exception('missing handling of streamed responses to 
protocol')
 
         # Inner function to perform async read.
         async def async_read():
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py 
b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
index cb34923d0c..434272c5c5 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
@@ -84,7 +84,13 @@ class Connection:
 
     def _receive(self):
         try:
-            self._transport.read(self.stream_chunk)
+            '''
+            GraphSON does not support streaming deserialization, we are 
aggregating data and bypassing streamed
+             deserialization while GraphSON is enabled for testing. Remove 
after GraphSON is removed.
+            '''
+            self._protocol.data_received_aggregate(self._transport.read(), 
self._result_set)
+            # re-enable streaming after graphSON removal
+            # self._transport.read(self.stream_chunk)
         finally:
             self._pool.put_nowait(self)
 
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py 
b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py
index 930d2de328..48c709de17 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/protocol.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/protocol.py
@@ -95,6 +95,37 @@ class GremlinServerHTTPProtocol(AbstractBaseProtocol):
 
         self._transport.write(message)
 
+    '''
+    GraphSON does not support streaming deserialization, we are aggregating 
data and bypassing streamed
+     deserialization while GraphSON is enabled for testing. Remove after 
GraphSON is removed.
+    '''
+    def data_received_aggregate(self, response, result_set):
+        response_msg = {'status': {'code': 0,
+                                         'message': '',
+                                         'exception': ''},
+                              'result': {'meta': {},
+                                         'data': []}}
+
+        response_msg = self._decode_chunk(response_msg, response, 
self._is_first_chunk)
+
+        self._is_first_chunk = False
+        status_code = response_msg['status']['code']
+        aggregate_to = response_msg['result']['meta'].get('aggregateTo', 
'list')
+        data = response_msg['result']['data']
+        result_set.aggregate_to = aggregate_to
+        self._is_first_chunk = True
+
+        if status_code == 204 and len(data) == 0:
+            result_set.stream.put_nowait([])
+        elif status_code in [200, 204, 206]:
+            result_set.stream.put_nowait(data)
+        else:
+            log.error("\r\nReceived error message '%s'\r\n\r\nWith result set 
'%s'",
+                      str(self._response_msg), str(result_set))
+            raise GremlinServerError({'code': status_code,
+                                        'message': 
self._response_msg['status']['message'],
+                                        'exception': 
self._response_msg['status']['exception']})
+
     # data is received in chunks
     def data_received(self, response_chunk, result_set, read_completed=None, 
http_req_resp=None):
         # we shouldn't need to use the http_req_resp code as status is sent in 
response message, but leaving it for now
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py 
b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py
index 8e207ae986..aa4c70ac7b 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/serializer.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/serializer.py
@@ -40,7 +40,7 @@ __author__ = 'David M. Brown (davebs...@gmail.com), Lyndon 
Bauto (lyndonb@bitqui
 GraphSONV4
 """
 
-class GraphSONSerializerV4(object):
+class GraphSONSerializersV4(object):
     """
     Message serializer for GraphSON. Allow users to pass custom reader,
     writer, and version kwargs for custom serialization. Otherwise,
@@ -89,7 +89,7 @@ class GraphSONSerializerV4(object):
             msg = json.loads(message if isinstance(message, str) else 
message.decode('utf-8'))
             return self._graphson_reader.to_object(msg)
         else:
-            # graphSON does not stream, the first chunk contains all info
+            # graphSON does not stream, all results are aggregated inside the 
first chunk
             return ""
 
 """
diff --git a/gremlin-python/src/main/python/radish/terrain.py 
b/gremlin-python/src/main/python/radish/terrain.py
index c54334ab0c..f341106467 100644
--- a/gremlin-python/src/main/python/radish/terrain.py
+++ b/gremlin-python/src/main/python/radish/terrain.py
@@ -96,6 +96,8 @@ def __create_remote(server_graph_name):
 
     if world.config.user_data["serializer"] == 
"application/vnd.graphbinary-v4.0":
         s = serializer.GraphBinarySerializersV4()
+    elif world.config.user_data["serializer"] == 
"application/vnd.gremlin-v4.0+json":
+        s = serializer.GraphSONSerializersV4()
     else:
         raise ValueError('serializer not found - ' + 
world.config.user_data["serializer"])
 
diff --git a/gremlin-python/src/main/python/tests/conftest.py 
b/gremlin-python/src/main/python/tests/conftest.py
index 0aafc43d1b..cc958c7c5d 100644
--- a/gremlin-python/src/main/python/tests/conftest.py
+++ b/gremlin-python/src/main/python/tests/conftest.py
@@ -25,6 +25,7 @@ import pytest
 import logging
 import queue
 
+from gremlin_python.driver import serializer
 from gremlin_python.driver.client import Client
 from gremlin_python.driver.connection import Connection
 from gremlin_python.driver.driver_remote_connection import 
DriverRemoteConnection
@@ -110,11 +111,17 @@ def graphbinary_serializer_v4(request):
     return GraphBinarySerializersV4()
 
 
-@pytest.fixture(params=['graphbinaryv4'])
+@pytest.fixture(params=['graphbinaryv4','graphsonv4'])
 def remote_connection(request):
     try:
         if request.param == 'graphbinaryv4':
-            remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern')
+            remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern',
+                                                 
request_serializer=serializer.GraphBinarySerializersV4(),
+                                                 
response_serializer=serializer.GraphBinarySerializersV4())
+        elif request.param == 'graphsonv4':
+            remote_conn = DriverRemoteConnection(anonymous_url, 'gmodern',
+                                                 
request_serializer=serializer.GraphSONSerializersV4(),
+                                                 
response_serializer=serializer.GraphSONSerializersV4())
         else:
             raise ValueError("Invalid serializer option - " + request.param)
     except OSError:
diff --git 
a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py 
b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py
index 95004076c4..86d6445316 100644
--- 
a/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py
+++ 
b/gremlin-python/src/main/python/tests/driver/test_driver_remote_connection.py
@@ -40,8 +40,8 @@ class TestDriverRemoteConnection(object):
     # in conftest.py and remove this
     def test_graphSONV4_temp(self):
         remote_conn = DriverRemoteConnection(test_no_auth_url, 'gmodern',
-                                             
request_serializer=serializer.GraphSONSerializerV4(),
-                                             
response_serializer=serializer.GraphSONSerializerV4())
+                                             
request_serializer=serializer.GraphSONSerializersV4(),
+                                             
response_serializer=serializer.GraphSONSerializersV4())
         g = traversal().with_(remote_conn)
         assert long(6) == g.V().count().to_list()[0]
         # #

Reply via email to