added code for new driver, updated driver tests
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1d203b74 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1d203b74 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1d203b74 Branch: refs/heads/TINKERPOP-1599 Commit: 1d203b7494432a5296e04581c9903f138753e964 Parents: 0a08bf9 Author: davebshow <davebs...@gmail.com> Authored: Mon Jan 30 17:22:36 2017 -0500 Committer: davebshow <davebs...@gmail.com> Committed: Fri Feb 17 11:36:57 2017 -0500 ---------------------------------------------------------------------- gremlin-python/pom.xml | 1 + .../python/TraversalSourceGenerator.groovy | 11 +- .../main/jython/gremlin_python/driver/client.py | 114 ++++++++ .../jython/gremlin_python/driver/connection.py | 78 ++++++ .../driver/driver_remote_connection.py | 260 +++---------------- .../jython/gremlin_python/driver/protocol.py | 103 ++++++++ .../gremlin_python/driver/remote_connection.py | 98 ++++--- .../jython/gremlin_python/driver/request.py | 25 ++ .../jython/gremlin_python/driver/resultset.py | 91 +++++++ .../jython/gremlin_python/driver/serializer.py | 117 +++++++++ .../gremlin_python/driver/tornado/__init__.py | 18 ++ .../gremlin_python/driver/tornado/transport.py | 48 ++++ .../jython/gremlin_python/driver/transport.py | 46 ++++ .../jython/gremlin_python/process/traversal.py | 11 +- gremlin-python/src/main/jython/setup.py | 18 +- .../src/main/jython/tests/conftest.py | 72 +++++ .../src/main/jython/tests/driver/test_client.py | 98 +++++++ .../driver/test_driver_remote_connection.py | 150 ++++------- .../test_driver_remote_connection_threaded.py | 77 +++--- .../jython/tests/structure/io/test_graphson.py | 21 +- 20 files changed, 1039 insertions(+), 418 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/pom.xml ---------------------------------------------------------------------- diff --git a/gremlin-python/pom.xml b/gremlin-python/pom.xml index c9d6c65..2b4a80e 100644 --- a/gremlin-python/pom.xml +++ b/gremlin-python/pom.xml @@ -314,6 +314,7 @@ <param>aenum==1.4.5</param> <param>tornado==4.4.1</param> <param>six==1.10.0</param> + <param>futures==3.0.5</param> </libraries> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy index fc76b71..995fe80 100644 --- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy +++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy @@ -116,15 +116,16 @@ class Traversal(object): return tempList def promise(self, cb=None): self.traversal_strategies.apply_async_strategies(self) - future_traversers = self.traversers - future = type(future_traversers)() + future_traversal = self.remote_results + future = type(future_traversal)() def process(f): try: - traversers = f.result() + traversal = f.result() except Exception as e: future.set_exception(e) else: - self.traversers = iter(traversers) + self.traversers = iter(traversal.traversers) + self.side_effects = traversal.side_effects if cb: try: result = cb(self) @@ -134,7 +135,7 @@ class Traversal(object): future.set_result(result) else: future.set_result(self) - future_traversers.add_done_callback(process) + future_traversal.add_done_callback(process) return future http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/client.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/client.py b/gremlin-python/src/main/jython/gremlin_python/driver/client.py new file mode 100644 index 0000000..dec39bf --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/client.py @@ -0,0 +1,114 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +import collections +import functools + +from concurrent.futures import ThreadPoolExecutor + +from six.moves import queue + +from gremlin_python.driver import connection, protocol, request +from gremlin_python.process import traversal + +# This is until concurrent.futures backport 3.1.0 release +try: + from multiprocessing import cpu_count +except ImportError: + # some platforms don't have multiprocessing + def cpu_count(): + return None + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class Client: + + def __init__(self, url, traversal_source, protocol_factory=None, + transport_factory=None, pool_size=None, max_workers=None, + message_serializer=None, username="", password=""): + self._url = url + self._traversal_source = traversal_source + self._message_serializer = message_serializer + self._username = username + self._password = password + if transport_factory is None: + try: + from gremlin_python.driver.tornado.transport import ( + TornadoTransport) + except ImportError: + raise Exception("Please install Tornado or pass" + "custom transport factory") + else: + transport_factory = lambda: TornadoTransport() + self._transport_factory = transport_factory + if protocol_factory is None: + protocol_factory = lambda: protocol.GremlinServerWSProtocol( + message_serializer=self._message_serializer, + username=self._username, + password=self._password) + self._protocol_factory = protocol_factory + if pool_size is None: + pool_size = 4 + self._pool_size = pool_size + # This is until concurrent.futures backport 3.1.0 release + if max_workers is None: + # Use this number because ThreadPoolExecutor is often + # used to overlap I/O instead of CPU work. + max_workers = (cpu_count() or 1) * 5 + self._executor = ThreadPoolExecutor(max_workers=max_workers) + # Threadsafe queue + self._pool = queue.Queue() + self._fill_pool() + + @property + def executor(self): + return self._executor + + @property + def traversal_source(self): + return self._traversal_source + + def _fill_pool(self): + for i in range(self._pool_size): + conn = self._get_connection() + self._pool.put_nowait(conn) + + def close(self): + while not self._pool.empty(): + conn = self._pool.get(True) + conn.close() + self._executor.shutdown() + + def _get_connection(self): + protocol = self._protocol_factory() + return connection.Connection( + self._url, self._traversal_source, protocol, + self._transport_factory, self._executor, self._pool) + + def submit(self, message): + return self.submitAsync(message).result() + + def submitAsync(self, message): + if isinstance(message, traversal.Bytecode): + message = request.RequestMessage( + processor='traversal', op='bytecode', + args={'gremlin': message, + 'aliases': {'g': self._traversal_source}}) + conn = self._pool.get(True) + return conn.write(message) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py new file mode 100644 index 0000000..abc4545 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py @@ -0,0 +1,78 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +import uuid +from concurrent.futures import Future +from six.moves import queue + +from gremlin_python.driver import resultset + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class Connection: + + def __init__(self, url, traversal_source, protocol, transport_factory, + executor, pool): + self._url = url + self._traversal_source = traversal_source + self._protocol = protocol + self._transport_factory = transport_factory + self._executor = executor + self._transport = None + self._pool = pool + self._results = {} + self.connect() + + def connect(self): + if self._transport: + self._transport.close() + self._transport = self._transport_factory() + self._transport.connect(self._url) + self._protocol.connection_made(self._transport) + + def close(self): + self._transport.close() + + def write(self, request_message): + request_id = str(uuid.uuid4()) + result_set = resultset.ResultSet(queue.Queue(), request_id) + self._results[request_id] = result_set + # Create write task + future = Future() + future_write = self._executor.submit( + self._protocol.write, request_id, request_message) + + def cb(f): + try: + f.result() + except Exception as e: + future.set_exception(e) + else: + # Start receive task + done = self._executor.submit(self._receive) + result_set.done = done + future.set_result(result_set) + + future_write.add_done_callback(cb) + return future + + def _receive(self): + data = self._transport.read() + self._protocol.data_received(data, self._results) + self._pool.put_nowait(self) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index 2cbe0e7..fb0e4ba 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -16,236 +16,50 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import base64 -import functools -import json -import uuid -from tornado import gen -from tornado import concurrent -from tornado import ioloop -from tornado import websocket +from concurrent.futures import Future -from gremlin_python.structure.io.graphson import GraphSONReader, GraphSONWriter -from .remote_connection import RemoteConnection -from .remote_connection import RemoteTraversal -from .remote_connection import RemoteTraversalSideEffects +from gremlin_python.driver import client +from gremlin_python.driver.remote_connection import ( + RemoteConnection, RemoteTraversal, RemoteTraversalSideEffects) - -class GremlinServerError(Exception): - pass +__author__ = 'David M. Brown (davebs...@gmail.com)' class DriverRemoteConnection(RemoteConnection): - def __init__(self, url, traversal_source, username="", password="", loop=None, graphson_reader=None, graphson_writer=None): - super(DriverRemoteConnection, self).__init__(url, traversal_source) - self._url = url - self._username = username - self._password = password - if loop is None: - loop = ioloop.IOLoop.current() - self._loop = loop - self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url)) - self._graphson_reader = graphson_reader or GraphSONReader() - self._graphson_writer = graphson_writer or GraphSONWriter() - - def submit(self, bytecode): - ''' - :param bytecode: the bytecode of a traversal to submit to the RemoteConnection - :return: a RemoteTraversal with RemoteTraversalSideEffects - ''' - request_id = str(uuid.uuid4()) - traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode)) - keys, value, close = self._get_side_effect_lambdas(request_id) - return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close, self._loop)) - - def submit_async(self, bytecode): - request_id = str(uuid.uuid4()) - future_traversers = self.submit_traversal_bytecode(request_id, bytecode) - keys, value, close = self._get_side_effect_lambdas(request_id) - side_effects = RemoteTraversalSideEffects(keys, value, close, self._loop) - return RemoteTraversal(future_traversers, side_effects) - - @gen.coroutine - def submit_traversal_bytecode(self, request_id, bytecode): - message = { - "requestId": { - "@type": "g:UUID", - "@value": request_id - }, - "op": "bytecode", - "processor": "traversal", - "args": { - "gremlin": self._graphson_writer.toDict(bytecode), - "aliases": {"g": self.traversal_source} - } - } - traversers = yield self._execute_message(message) - raise gen.Return(traversers) - - @gen.coroutine - def submit_sideEffect_keys(self, request_id): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "keys", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - } - } - } - keys = yield self._execute_message(message) - raise gen.Return(set(keys)) - - @gen.coroutine - def submit_sideEffect_value(self, request_id, key): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "gather", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - }, - "sideEffectKey": key, - "aliases": {"g": self.traversal_source} - } - } - try: - value = yield self._execute_message(message) - except: - raise KeyError(key) - raise gen.Return(value) - - @gen.coroutine - def submit_sideEffect_close(self, request_id): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "close", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - } - } - } - result = yield self._execute_message(message) - raise gen.Return(result) - def _get_side_effect_lambdas(self, request_id): - side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) - side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) - side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id)) - return side_effect_keys, side_effect_value, side_effect_close - - @gen.coroutine - def _execute_message(self, send_message): - send_message = b"".join([b"\x21", - b"application/vnd.gremlin-v2.0+json", - json.dumps(send_message, separators=(',', ':')).encode("utf-8")]) - if self._websocket.protocol is None: - self._websocket = yield websocket.websocket_connect(self.url) - self._websocket.write_message(send_message, binary=True) - response = Response(self._websocket, self._username, self._password, self._graphson_reader) - results = None - while True: - recv_message = yield response.receive() - if recv_message is None: - break - aggregateTo = recv_message[0] - # on first message, get the right result data structure - if None == results: - if "list" == aggregateTo: - results = [] - elif "set" == aggregateTo: - results = set() - elif aggregateTo in ["map", "bulkset"]: - results = {} - elif "none" == aggregateTo: - results = None - else: - results = [] - - # if there is no update to a structure, then the item is the result - if results is None: - results = recv_message[1][0] - # updating a map is different than a list or a set - elif isinstance(results, dict): - if "map" == aggregateTo: - for item in recv_message[1]: - results.update(item) - else: - for item in recv_message[1]: - results[item.object] = item.bulk - # flat add list to result list - else: - results += recv_message[1] - raise gen.Return([] if None == results else results) + def __init__(self, url, traversal_source, protocol_factory=None, + transport_factory=None, pool_size=None, max_workers=None, + username="", password=""): + self._client = client.Client(url, traversal_source, protocol_factory, + transport_factory, pool_size, max_workers, + None, username, password) + self._url = self._client._url + self._traversal_source = self._client._traversal_source def close(self): - self._websocket.close() - + self._client.close() -class Response: - def __init__(self, websocket, username, password, graphson_reader): - self._websocket = websocket - self._username = username - self._password = password - self._closed = False - self._graphson_reader = graphson_reader - - @gen.coroutine - def receive(self): - if self._closed: - return - recv_message = yield self._websocket.read_message() - recv_message = json.loads(recv_message.decode('utf-8')) - status_code = recv_message["status"]["code"] - aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list") + def submit(self, bytecode): + result_set = self._client.submit(bytecode) + results = result_set.all().result() + side_effects = RemoteTraversalSideEffects(result_set.request_id, + self._client) + return RemoteTraversal(iter(results), side_effects) + + def submitAsync(self, bytecode): + future = Future() + future_result_set = self._client.submitAsync(bytecode) + + def cb(f): + try: + result_set = f.result() + except Exception as e: + future.set_exception(e) + else: + results = result_set.all().result() + side_effects = RemoteTraversalSideEffects(result_set.request_id, + self._client) + future.set_result(RemoteTraversal(iter(results), side_effects)) - # authentification required then - if status_code == 407: - self._websocket.write_message( - b"".join([b"\x21", - b"application/vnd.gremlin-v2.0+json", - json.dumps({ - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "authentication", - "processor": "traversal", - "args": { - "sasl": base64.b64encode( - b"".join([b"\x00", self._username.encode("utf-8"), - b"\x00", self._password.encode("utf-8")])).decode() - } - }, separators=(',', ':')).encode("utf-8")]), binary=True) - results = yield self.receive() - raise gen.Return(results) - elif status_code == 204: - self._closed = True - return - elif status_code in [200, 206]: - results = [] - for item in recv_message["result"]["data"]: - results.append(self._graphson_reader.toObject(item)) - if status_code == 200: - self._closed = True - raise gen.Return((aggregateTo, results)) - else: - self._closed = True - raise GremlinServerError( - "{0}: {1}".format(status_code, recv_message["status"]["message"])) + future_result_set.add_done_callback(cb) + return future http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py new file mode 100644 index 0000000..2ace35e --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py @@ -0,0 +1,103 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +import abc +import base64 +import collections +import json +import uuid + +import six + +from gremlin_python.driver import serializer, request + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class GremlinServerError(Exception): + pass + + +@six.add_metaclass(abc.ABCMeta) +class AbstractBaseProtocol: + + @abc.abstractmethod + def connection_made(self, transport): + self._transport = transport + + @abc.abstractmethod + def data_received(self, message): + pass + + @abc.abstractmethod + def write(self, request_id, request_message): + pass + + +class GremlinServerWSProtocol(AbstractBaseProtocol): + + def __init__(self, message_serializer=None, username='', password=''): + if message_serializer is None: + message_serializer = serializer.GraphSONMessageSerializer() + self._message_serializer = message_serializer + self._username = username + self._password = password + + def connection_made(self, transport): + super(GremlinServerWSProtocol, self).connection_made(transport) + + def write(self, request_id, request_message): + message = self._message_serializer.serialize_message( + request_id, request_message) + self._transport.write(message) + + def data_received(self, data, results_dict): + data = json.loads(data.decode('utf-8')) + request_id = data['requestId'] + result_set = results_dict[request_id] + status_code = data['status']['code'] + aggregate_to = data['result']['meta'].get('aggregateTo', 'list') + result_set.aggregate_to = aggregate_to + if status_code == 407: + auth = b''.join([b'\x00', self._username.encode('utf-8'), + b'\x00', self._password.encode('utf-8')]) + request_message = request.RequestMessage( + 'traversal', 'authentication', + {'sasl': base64.b64encode(auth).decode()}) + self.write(request_id, request_message) + data = self._transport.read() + self.data_received(data, results_dict) + elif status_code == 204: + result_set.stream.put_nowait([]) + del results_dict[request_id] + elif status_code in [200, 206]: + results = [] + for msg in data["result"]["data"]: + results.append( + self._message_serializer.deserialize_message(msg)) + result_set.stream.put_nowait(results) + if status_code == 206: + data = self._transport.read() + self.data_received(data, results_dict) + else: + # result_set.done.set_result(None) + del results_dict[request_id] + else: + del results_dict[request_id] + raise GremlinServerError( + "{0}: {1}".format(status_code, data["status"]["message"])) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py index f7ed48e..c48cfe3 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py @@ -17,11 +17,11 @@ specific language governing permissions and limitations under the License. ''' import abc +import collections import six -from ..process.traversal import Traversal -from ..process.traversal import TraversalStrategy -from ..process.traversal import TraversalSideEffects +from gremlin_python.driver import request +from gremlin_python.process import traversal __author__ = 'Marko A. Rodriguez (http://markorodriguez.com)' @@ -42,45 +42,53 @@ class RemoteConnection(object): @abc.abstractmethod def submit(self, bytecode): - print("sending " + bytecode + " to GremlinServer...") - return RemoteTraversal(iter([]), TraversalSideEffects()) + pass def __repr__(self): return "remoteconnection[" + self._url + "," + self._traversal_source + "]" -class RemoteTraversal(Traversal): +class RemoteTraversal(traversal.Traversal): def __init__(self, traversers, side_effects): - Traversal.__init__(self, None, None, None) + super(RemoteTraversal, self).__init__(None, None, None) self.traversers = traversers - self.side_effects = side_effects + self._side_effects = side_effects + + @property + def side_effects(self): + return self._side_effects + + @side_effects.setter + def side_effects(self, val): + self._side_effects = val -class RemoteTraversalSideEffects(TraversalSideEffects): - def __init__(self, keys_lambda, value_lambda, close_lambda, loop): - self._keys_lambda = keys_lambda - self._value_lambda = value_lambda - self._close_lambda = close_lambda - self._loop = loop +class RemoteTraversalSideEffects(traversal.TraversalSideEffects): + def __init__(self, side_effect, client): + self._side_effect = side_effect + self._client = client self._keys = set() self._side_effects = {} self._closed = False def keys(self): - if self._loop._running: - raise RuntimeError("Cannot call side effect methods" - "while event loop is running") if not self._closed: - self._keys = self._keys_lambda() + message = request.RequestMessage( + 'traversal', 'keys', + {'sideEffect': self._side_effect, + 'aliases': {'g': self._client.traversal_source}}) + self._keys = set(self._client.submit(message).all().result()) return self._keys def get(self, key): - if self._loop._running: - raise RuntimeError("Cannot call side effect methods" - "while event loop is running") + if not self._side_effects.get(key): if not self._closed: - results = self._value_lambda(key) + message = request.RequestMessage( + 'traversal', 'gather', + {'sideEffect': self._side_effect, 'sideEffectKey': key, + 'aliases': {'g': self._client.traversal_source}}) + results = self._aggregate_results(self._client.submit(message)) self._side_effects[key] = results self._keys.add(key) else: @@ -88,27 +96,57 @@ class RemoteTraversalSideEffects(TraversalSideEffects): return self._side_effects[key] def close(self): - if self._loop._running: - raise RuntimeError("Cannot call side effect methods" - "while event loop is running") - results = self._close_lambda() + if not self._closed: + message = request.RequestMessage( + 'traversal', 'close', + {'sideEffect': self._side_effect, + 'aliases': {'g': self._client._traversal_source}}) + results = self._client.submit(message).all().result() self._closed = True return results + def _aggregate_results(self, result_set): + aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}, + 'none': None} + results = None + for msg in result_set: + if results is None: + aggregate_to = result_set.aggregate_to + results = aggregates.get(aggregate_to, []) + # on first message, get the right result data structure + # if there is no update to a structure, then the item is the result + if results is None: + results = msg[0] + # updating a map is different than a list or a set + elif isinstance(results, dict): + if aggregate_to == "map": + for item in msg: + results.update(item) + else: + for item in msg: + results[item.object] = item.bulk + elif isinstance(results, set): + results.update(msg) + # flat add list to result list + else: + results += msg + if results is None: + results = [] + return results + -class RemoteStrategy(TraversalStrategy): +class RemoteStrategy(traversal.TraversalStrategy): def __init__(self, remote_connection): self.remote_connection = remote_connection def apply(self, traversal): if traversal.traversers is None: remote_traversal = self.remote_connection.submit(traversal.bytecode) + traversal.remote_results = remote_traversal traversal.side_effects = remote_traversal.side_effects traversal.traversers = remote_traversal.traversers def apply_async(self, traversal): if traversal.traversers is None: - remote_traversal = self.remote_connection.submit_async( + traversal.remote_results = self.remote_connection.submitAsync( traversal.bytecode) - traversal.side_effects = remote_traversal.side_effects - traversal.traversers = remote_traversal.traversers http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/request.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/request.py b/gremlin-python/src/main/jython/gremlin_python/driver/request.py new file mode 100644 index 0000000..ac7b845 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/request.py @@ -0,0 +1,25 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +import collections + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +RequestMessage = collections.namedtuple( + 'RequestMessage', ['processor', 'op', 'args']) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py new file mode 100644 index 0000000..cfdca5b --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py @@ -0,0 +1,91 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +from concurrent.futures import Future + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class ResultSet: + + def __init__(self, stream, request_id): + self._stream = stream + self._request_id = request_id + self._done = None + self._aggregate_to = None + + @property + def aggregate_to(self): + return self._aggregate_to + + @aggregate_to.setter + def aggregate_to(self, val): + self._aggregate_to = val + + @property + def request_id(self): + return self._request_id + + @property + def stream(self): + return self._stream + + def __iter__(self): + return self + + def __next__(self): + result = self.one() + if not result: + raise StopIteration + return result + + def next(self): + return self.__next__() + + @property + def done(self): + return self._done + + @done.setter + def done(self, future): + self._done = future + + def one(self): + while not self.done.done(): + if not self.stream.empty(): + return self.stream.get_nowait() + if not self.stream.empty(): + return self.stream.get_nowait() + return self.done.result() + + def all(self): + future = Future() + + def cb(f): + try: + f.result() + except Exception as e: + future.set_exception(e) + else: + results = [] + while not self.stream.empty(): + results += self.stream.get_nowait() + future.set_result(results) + + self.done.add_done_callback(cb) + return future http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py b/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py new file mode 100644 index 0000000..10dcfd3 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/serializer.py @@ -0,0 +1,117 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +try: + import ujson as json +except ImportError: + import json + +from gremlin_python.structure.io import graphson + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class Processor: + """Base class for OpProcessor serialization system.""" + + _graphson_writer = graphson.GraphSONWriter() + + def __init__(self, default_args): + self._default_args = default_args + + def get_op_args(self, op, args): + op_method = getattr(self, op, None) + if not op_method: + raise Exception("Processor does not support op: {}".format(op)) + args_ = self._default_args.get(op, dict()).copy() + args_.update(args) + return op_method(args_) + + +class GraphSONMessageSerializer: + """Message serializer for GraphSON""" + + _graphson_reader = graphson.GraphSONReader() + + class traversal(Processor): + + def authentication(self, args): + return args + + def bytecode(self, args): + gremlin = args['gremlin'] + args['gremlin'] = self._graphson_writer.toDict(gremlin) + aliases = args.get('aliases', '') + if not aliases: + aliases = {'g': 'g'} + args['aliases'] = aliases + return args + + def close(self, args): + return self.keys(args) + + def gather(self, args): + side_effect = args['sideEffect'] + args['sideEffect'] = {'@type': 'g:UUID', '@value': side_effect} + aliases = args.get('aliases', '') + if not aliases: + aliases = {'g': 'g'} + args['aliases'] = aliases + return args + + def keys(self, args): + side_effect = args['sideEffect'] + args['sideEffect'] = {'@type': 'g:UUID', '@value': side_effect} + return args + + + def get_processor(self, processor): + processor = getattr(self, processor, None) + if not processor: + raise Exception("Unknown processor") + return processor({}) + + def serialize_message(self, request_id, request_message): + processor = request_message.processor + op = request_message.op + args = request_message.args + if not processor: + processor_obj = self.get_processor('standard') + else: + processor_obj = self.get_processor(processor) + args = processor_obj.get_op_args(op, args) + message = self.build_message(request_id, processor, op, args) + return message + + def build_message(self, request_id, processor, op, args): + message = { + 'requestId': {'@type': 'g:UUID', '@value': request_id}, + 'processor': processor, + 'op': op, + 'args': args + } + return self.finalize_message(message, b"\x21", + b"application/vnd.gremlin-v2.0+json") + + def finalize_message(self, message, mime_len, mime_type): + message = json.dumps(message) + message = b''.join([mime_len, mime_type, message.encode('utf-8')]) + return message + + def deserialize_message(self, message): + return self._graphson_reader.toObject(message) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py new file mode 100644 index 0000000..17b49a5 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/__init__.py @@ -0,0 +1,18 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py new file mode 100644 index 0000000..cc218e9 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/tornado/transport.py @@ -0,0 +1,48 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +from tornado import ioloop, gen +from tornado import websocket + +from gremlin_python.driver.transport import AbstractBaseTransport + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +class TornadoTransport(AbstractBaseTransport): + + def __init__(self): + self._loop = ioloop.IOLoop(make_current=False) + + def connect(self, url): + self._ws = self._loop.run_sync( + lambda: websocket.websocket_connect(url)) + + def write(self, message): + self._loop.run_sync( + lambda: self._ws.write_message(message, binary=True)) + + def read(self): + return self._loop.run_sync(lambda: self._ws.read_message()) + + def close(self): + self._ws.close() + self._loop.close() + + def closed(self): + return not self._ws.protocol http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/driver/transport.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/transport.py b/gremlin-python/src/main/jython/gremlin_python/driver/transport.py new file mode 100644 index 0000000..9181956 --- /dev/null +++ b/gremlin-python/src/main/jython/gremlin_python/driver/transport.py @@ -0,0 +1,46 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +""" +import abc +import six + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +@six.add_metaclass(abc.ABCMeta) +class AbstractBaseTransport: + + @abc.abstractmethod + def connect(self, url): + pass + + @abc.abstractmethod + def write(self, message): + pass + + @abc.abstractmethod + def read(self): + pass + + @abc.abstractmethod + def close(self): + pass + + @abc.abstractproperty + def closed(self): + pass http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/gremlin_python/process/traversal.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py index 07afeac..f09ab76 100644 --- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py +++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py @@ -79,15 +79,16 @@ class Traversal(object): return tempList def promise(self, cb=None): self.traversal_strategies.apply_async_strategies(self) - future_traversers = self.traversers - future = type(future_traversers)() + future_traversal = self.remote_results + future = type(future_traversal)() def process(f): try: - traversers = f.result() + traversal = f.result() except Exception as e: future.set_exception(e) else: - self.traversers = iter(traversers) + self.traversers = iter(traversal.traversers) + self.side_effects = traversal.side_effects if cb: try: result = cb(self) @@ -97,7 +98,7 @@ class Traversal(object): future.set_result(result) else: future.set_result(self) - future_traversers.add_done_callback(process) + future_traversal.add_done_callback(process) return future http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/setup.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/setup.py b/gremlin-python/src/main/jython/setup.py index 6878c34..8b30b9f 100644 --- a/gremlin-python/src/main/jython/setup.py +++ b/gremlin-python/src/main/jython/setup.py @@ -18,8 +18,9 @@ under the License. ''' import codecs import os +import sys import time -from setuptools import setup, Command +from setuptools import setup # Folder containing the setup.py root = os.path.dirname(os.path.abspath(__file__)) @@ -43,6 +44,15 @@ from gremlin_python import __version__ version = __version__.version +install_requires = [ + 'aenum==1.4.5', + 'tornado==4.4.1', + 'six==1.10.0' +] + +if sys.version_info < (3,2): + install_requires += ['futures==3.0.5'] + setup( name='gremlinpython', version=version, @@ -60,11 +70,7 @@ setup( 'pytest', 'mock' ], - install_requires=[ - 'aenum==1.4.5', - 'tornado==4.4.1', - 'six==1.10.0' - ], + install_requires=install_requires, classifiers=[ "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/tests/conftest.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/conftest.py b/gremlin-python/src/main/jython/tests/conftest.py new file mode 100644 index 0000000..3ab64a8 --- /dev/null +++ b/gremlin-python/src/main/jython/tests/conftest.py @@ -0,0 +1,72 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +''' +import concurrent.futures +import pytest + +from six.moves import queue + +from gremlin_python.driver.client import Client +from gremlin_python.driver.connection import Connection +from gremlin_python.driver.driver_remote_connection import ( + DriverRemoteConnection) +from gremlin_python.driver.protocol import GremlinServerWSProtocol +from gremlin_python.driver.tornado.transport import TornadoTransport + + +@pytest.fixture +def connection(request): + try: + protocol = GremlinServerWSProtocol( + username='stephen', password='password') + executor = concurrent.futures.ThreadPoolExecutor(5) + pool = queue.Queue() + conn = Connection('ws://localhost:45940/gremlin', 'g', protocol, + lambda: TornadoTransport(), executor, pool) + except: + pytest.skip('Gremlin Server is not running') + else: + def fin(): + executor.shutdown() + conn.close() + request.addfinalizer(fin) + return conn + +@pytest.fixture +def client(request): + try: + client = Client('ws://localhost:45940/gremlin', 'g') + except: + pytest.skip('Gremlin Server is not running') + else: + def fin(): + client.close() + request.addfinalizer(fin) + return client + +@pytest.fixture +def remote_connection(request): + try: + remote_conn = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') + except: + pytest.skip('Gremlin Server is not running') + else: + def fin(): + remote_conn.close() + request.addfinalizer(fin) + return remote_conn http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/tests/driver/test_client.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/driver/test_client.py b/gremlin-python/src/main/jython/tests/driver/test_client.py new file mode 100644 index 0000000..6395d7b --- /dev/null +++ b/gremlin-python/src/main/jython/tests/driver/test_client.py @@ -0,0 +1,98 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +''' +import pytest + +from gremlin_python.driver.client import Client +from gremlin_python.driver.request import RequestMessage +from gremlin_python.structure.graph import Graph + +__author__ = 'David M. Brown (davebs...@gmail.com)' + + +def test_connection(connection): + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + results_set = connection.write(message).result() + future = results_set.all() + results = future.result() + assert len(results) == 6 + assert isinstance(results, list) + +def test_client(client): + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + result_set = client.submit(message) + assert len(result_set.all().result()) == 6 + client.close() + +def test_iterate_result_set(client): + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + result_set = client.submit(message) + results = [] + for result in result_set: + results += result + assert len(results) == 6 + client.close() + +def test_client_async(client): + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + future = client.submitAsync(message) + assert not future.done() + result_set = future.result() + assert len(result_set.all().result()) == 6 + client.close() + +def test_connection_share(client): + # Overwrite fixture with pool_size=1 client + client = Client('ws://localhost:45940/gremlin', 'g', pool_size=1) + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + future = client.submitAsync(message) + future2 = client.submitAsync(message) + + result_set2 = future2.result() + assert len(result_set2.all().result()) == 6 + + # This future has to finish for the second to yield result - pool_size=1 + assert future.done() + result_set = future.result() + assert len(result_set.all().result()) == 6 + client.close() + +def test_multi_conn_pool(client): + g = Graph().traversal() + t = g.V() + message = RequestMessage('traversal', 'bytecode', {'gremlin': t.bytecode}) + future = client.submitAsync(message) + future2 = client.submitAsync(message) + + result_set2 = future2.result() + assert len(result_set2.all().result()) == 6 + + # with connection pool `future` may or may not be done here + result_set = future.result() + assert len(result_set.all().result()) == 6 + client.close() http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py index 783cf7e..cd64e2b 100644 --- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py +++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py @@ -16,19 +16,14 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' - -__author__ = 'Marko A. Rodriguez (http://markorodriguez.com)' - -import unittest -from unittest import TestCase - import pytest from tornado import ioloop, gen from gremlin_python import statics from gremlin_python.statics import long -from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection +from gremlin_python.driver.driver_remote_connection import ( + DriverRemoteConnection) from gremlin_python.process.traversal import Traverser from gremlin_python.process.traversal import TraversalStrategy from gremlin_python.process.graph_traversal import __ @@ -36,16 +31,17 @@ from gremlin_python.structure.graph import Graph from gremlin_python.structure.graph import Vertex from gremlin_python.process.strategies import SubgraphStrategy +__author__ = 'Marko A. Rodriguez (http://markorodriguez.com)' + -class TestDriverRemoteConnection(TestCase): - def test_traversals(self): +class TestDriverRemoteConnection(object): + def test_traversals(self, remote_connection): statics.load_statics(globals()) - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - assert "remoteconnection[ws://localhost:45940/gremlin,g]" == str(connection) - g = Graph().traversal().withRemote(connection) + assert "remoteconnection[ws://localhost:45940/gremlin,g]" == str(remote_connection) + g = Graph().traversal().withRemote(remote_connection) assert long(6) == g.V().count().toList()[0] - # + # # assert Vertex(1) == g.V(1).next() assert 1 == g.V(1).id().next() assert Traverser(Vertex(1)) == g.V(1).nextTraverser() @@ -56,26 +52,24 @@ class TestDriverRemoteConnection(TestCase): assert 2 == len(results) assert "lop" in results assert "ripple" in results - # + # # assert 10 == g.V().repeat(both()).times(5)[0:10].count().next() - assert 1 == g.V().repeat(both()).times(5)[0].count().next() + assert 1 == g.V().repeat(both()).times(5)[0:1].count().next() assert 0 == g.V().repeat(both()).times(5)[0:0].count().next() assert 4 == g.V()[2:].count().next() assert 2 == g.V()[:2].count().next() - # + # # results = g.withSideEffect('a',['josh','peter']).V(1).out('created').in_('created').values('name').where(within('a')).toList() assert 2 == len(results) assert 'josh' in results assert 'peter' in results - # todo: need a traversal metrics deserializer + # # todo: need a traversal metrics deserializer g.V().out().profile().next() - connection.close() - def test_strategies(self): + def test_strategies(self, remote_connection): statics.load_statics(globals()) - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') # - g = Graph().traversal().withRemote(connection). \ + g = Graph().traversal().withRemote(remote_connection). \ withStrategies(TraversalStrategy("SubgraphStrategy", {"vertices": __.hasLabel("person"), "edges": __.hasLabel("created")})) @@ -84,7 +78,7 @@ class TestDriverRemoteConnection(TestCase): assert 1 == g.V().label().dedup().count().next() assert "person" == g.V().label().dedup().next() # - g = Graph().traversal().withRemote(connection). \ + g = Graph().traversal().withRemote(remote_connection). \ withStrategies(SubgraphStrategy(vertices=__.hasLabel("person"), edges=__.hasLabel("created"))) assert 4 == g.V().count().next() assert 0 == g.E().count().next() @@ -98,24 +92,19 @@ class TestDriverRemoteConnection(TestCase): assert "person" == g.V().label().next() assert "marko" == g.V().name.next() # - g = Graph().traversal().withRemote(connection).withComputer() + g = Graph().traversal().withRemote(remote_connection).withComputer() assert 6 == g.V().count().next() assert 6 == g.E().count().next() - connection.close() - def test_side_effects(self): + def test_side_effects(self, remote_connection): statics.load_statics(globals()) - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') # - g = Graph().traversal().withRemote(connection) + g = Graph().traversal().withRemote(remote_connection) ### t = g.V().hasLabel("project").name.iterate() assert 0 == len(t.side_effects.keys()) - try: + with pytest.raises(Exception): m = t.side_effects["m"] - raise Exception("Accessing a non-existent key should throw an error") - except KeyError: - pass ### t = g.V().out("created").groupCount("m").by("name") results = t.toSet() @@ -131,7 +120,7 @@ class TestDriverRemoteConnection(TestCase): assert 1 == m["ripple"] assert isinstance(m["lop"], long) assert isinstance(m["ripple"], long) - ### + ## t = g.V().out("created").groupCount("m").by("name").name.aggregate("n") results = t.toSet() assert 2 == len(results) @@ -154,16 +143,11 @@ class TestDriverRemoteConnection(TestCase): assert 32 == list(results)[0] assert 32 == t.side_effects['m'] assert 1 == len(t.side_effects.keys()) - try: + with pytest.raises(Exception): x = t.side_effects["x"] - raise Exception("Accessing a non-existent key should throw an error") - except KeyError: - pass - connection.close() - def test_side_effect_close(self): - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - g = Graph().traversal().withRemote(connection) + def test_side_effect_close(self, remote_connection): + g = Graph().traversal().withRemote(remote_connection) t = g.V().aggregate('a').aggregate('b') t.toList() @@ -193,76 +177,30 @@ class TestDriverRemoteConnection(TestCase): # Try to get 'b' directly from server, should throw error with pytest.raises(Exception): t.side_effects.value_lambda('b') - connection.close() - def test_promise(self): - loop = ioloop.IOLoop.current() - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - g = Graph().traversal().withRemote(connection) - - @gen.coroutine - def go(): - future_traversal = g.V().promise(lambda x: x.toList()) - assert not future_traversal.done() - resp = yield future_traversal - assert future_traversal.done() - assert len(resp) == 6 - count = yield g.V().count().promise(lambda x: x.next()) - assert count == 6 - - loop.run_sync(go) - connection.close() - - def test_promise_side_effects(self): - loop = ioloop.IOLoop.current() - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - g = Graph().traversal().withRemote(connection) - - # Side effects are problematic in coroutines. - # Because they are designed to be synchronous (calling `run_sync`) - # they result in an error if called from a coroutine because - # the event loop is already running - @gen.coroutine - def go(): - traversal = yield g.V().aggregate('a').promise() - # Calling synchronous side effect methods from coroutine raises. - with pytest.raises(RuntimeError): - keys = traversal.side_effects.keys() - - with pytest.raises(RuntimeError): - keys = traversal.side_effects.get('a') - - with pytest.raises(RuntimeError): - keys = traversal.side_effects.close() - - loop.run_sync(go) - - # If we return the traversal though, we can use side effects per usual. - @gen.coroutine - def go(): - traversal = yield g.V().aggregate('a').promise() - raise gen.Return(traversal) # Weird legacy Python compatible idiom - - # See, normal side effects. - traversal = loop.run_sync(go) - a, = traversal.side_effects.keys() + def test_promise(self, remote_connection): + g = Graph().traversal().withRemote(remote_connection) + future = g.V().aggregate('a').promise() + t = future.result() + assert len(t.toList()) == 6 + a, = t.side_effects.keys() assert a == 'a' - results = traversal.side_effects.get('a') + results = t.side_effects.get('a') assert results - results = traversal.side_effects.close() + results = t.side_effects.close() assert not results - connection.close() - -if __name__ == '__main__': - test = False - try: - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - test = True - connection.close() - except: - print("GremlinServer is not running and this test case will not execute: " + __file__) +def test_in_tornado_app(remote_connection): + # Make sure nothing weird with loops + @gen.coroutine + def go(): + conn = DriverRemoteConnection( + 'ws://localhost:45940/gremlin', 'g', pool_size=4) + g = Graph().traversal().withRemote(conn) + yield gen.sleep(0) + assert len(g.V().toList()) == 6 + conn.close() - if test: - unittest.main() + io_loop = ioloop.IOLoop.current() + io_loop.run_sync(go) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py index 0c18651..d5efc0d 100644 --- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py +++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection_threaded.py @@ -16,58 +16,53 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' - - -__author__ = 'David M. Brown (davebs...@gmail.com)' - - import sys from threading import Thread import pytest - from six.moves import queue -from tornado import ioloop from gremlin_python.driver.driver_remote_connection import ( DriverRemoteConnection) from gremlin_python.structure.graph import Graph - -skip = False -try: - connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') - connection.close() -except: - skip = True +__author__ = 'David M. Brown (davebs...@gmail.com)' -@pytest.mark.skipif(skip, reason='Gremlin Server is not running') -class TestDriverRemoteConnectionThreaded: +def test_conns_in_threads(remote_connection): + q = queue.Queue() + child = Thread(target=_executor, args=(q, None)) + child2 = Thread(target=_executor, args=(q, None)) + child.start() + child2.start() + for x in range(2): + success = q.get() + assert success == 'success!' + child.join() + child2.join() - def test_threaded_client(self): - q = queue.Queue() - # Here if we give each thread its own loop there is no problem. - loop1 = ioloop.IOLoop() - loop2 = ioloop.IOLoop() - child = Thread(target=self._executor, args=(q, loop1)) - child2 = Thread(target=self._executor, args=(q, loop2)) - child.start() - child2.start() - for x in range(2): - success = q.get() - assert success == 'success!' - child.join() - child2.join() +def test_conn_in_threads(remote_connection): + q = queue.Queue() + child = Thread(target=_executor, args=(q, remote_connection)) + child2 = Thread(target=_executor, args=(q, remote_connection)) + child.start() + child2.start() + for x in range(2): + success = q.get() + assert success == 'success!' + child.join() + child2.join() - def _executor(self, q, loop): - try: - connection = DriverRemoteConnection( - 'ws://localhost:45940/gremlin', 'g', loop=loop) - g = Graph().traversal().withRemote(connection) - assert len(g.V().toList()) == 6 - except: - q.put(sys.exc_info()[0]) - else: - q.put('success!') - connection.close() +def _executor(q, conn): + if not conn: + conn = DriverRemoteConnection( + 'ws://localhost:45940/gremlin', 'g', pool_size=4) + try: + g = Graph().traversal().withRemote(conn) + future = g.V().promise() + t = future.result() + assert len(t.toList()) == 6 + except: + q.put(sys.exc_info()[0]) + else: + q.put('success!') http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d203b74/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py b/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py index 2437cfd..7bca769 100644 --- a/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py +++ b/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py @@ -141,8 +141,25 @@ class TestGraphSONWriter(TestCase): assert """true""" == self.graphson_writer.writeObject(True) def test_P(self): - assert """{"@type":"g:P","@value":{"predicate":"and","value":[{"@type":"g:P","@value":{"predicate":"or","value":[{"@type":"g:P","@value":{"predicate":"lt","value":"b"}},{"@type":"g:P","@value":{"predicate":"gt","value":"c"}}]}},{"@type":"g:P","@value":{"predicate":"neq","value":"d"}}]}}""" == self.graphson_writer.writeObject( - P.lt("b").or_(P.gt("c")).and_(P.neq("d"))) + result = {'@type': 'g:P', + '@value': { + 'predicate': 'and', + 'value': [{ + '@type': 'g:P', + '@value': { + 'predicate': 'or', + 'value': [{ + '@type': 'g:P', + '@value': {'predicate': 'lt', 'value': 'b'} + }, + {'@type': 'g:P', '@value': {'predicate': 'gt', 'value': 'c'}} + ] + } + }, + {'@type': 'g:P', '@value': {'predicate': 'neq', 'value': 'd'}}]}} + + assert result == json.loads( + self.graphson_writer.writeObject(P.lt("b").or_(P.gt("c")).and_(P.neq("d")))) def test_strategies(self): # we have a proxy model for now given that we don't want to have to have g:XXX all registered on the Gremlin traversal machine (yet)