implemented promise API for gremlin-python. side effect retrieval is problematic
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b0b93308 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b0b93308 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b0b93308 Branch: refs/heads/TINKERPOP-1130 Commit: b0b933084b38f77fcd4843796980cabe313ed1e4 Parents: f02e183 Author: davebshow <[email protected]> Authored: Wed Dec 7 13:35:42 2016 -0500 Committer: Stephen Mallette <[email protected]> Committed: Fri Dec 16 10:03:41 2016 -0500 ---------------------------------------------------------------------- .../python/TraversalSourceGenerator.groovy | 27 +++++++++ .../driver/driver_remote_connection.py | 21 +++++-- .../gremlin_python/driver/remote_connection.py | 7 +++ .../jython/gremlin_python/process/traversal.py | 28 +++++++++- .../driver/test_driver_remote_connection.py | 59 ++++++++++++++++++++ 5 files changed, 137 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy index 73ffcb6..fc76b71 100644 --- a/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy +++ b/gremlin-python/src/main/groovy/org/apache/tinkerpop/gremlin/python/TraversalSourceGenerator.groovy @@ -114,6 +114,28 @@ class Traversal(object): except StopIteration: return tempList tempList.append(temp) return tempList + def promise(self, cb=None): + self.traversal_strategies.apply_async_strategies(self) + future_traversers = self.traversers + future = type(future_traversers)() + def process(f): + try: + traversers = f.result() + except Exception as e: + future.set_exception(e) + else: + self.traversers = iter(traversers) + if cb: + try: + result = cb(self) + except Exception as e: + future.set_exception(e) + else: + future.set_result(result) + else: + future.set_result(self) + future_traversers.add_done_callback(process) + return future """) @@ -223,6 +245,9 @@ class TraversalStrategies(object): def apply_strategies(self, traversal): for traversal_strategy in self.traversal_strategies: traversal_strategy.apply(traversal) + def apply_async_strategies(self, traversal): + for traversal_strategy in self.traversal_strategies: + traversal_strategy.apply_async(traversal) def __repr__(self): return str(self.traversal_strategies) @@ -233,6 +258,8 @@ class TraversalStrategy(object): self.configuration = {} if configuration is None else configuration def apply(self, traversal): return + def apply_async(self, traversal): + return def __eq__(self, other): return isinstance(other, self.__class__) def __hash__(self): http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index d975f60..babb113 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -17,9 +17,11 @@ specific language governing permissions and limitations under the License. """ import base64 +import functools import json import uuid from tornado import gen +from tornado import concurrent from tornado import ioloop from tornado import websocket @@ -51,10 +53,15 @@ class DriverRemoteConnection(RemoteConnection): ''' request_id = str(uuid.uuid4()) traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode)) - side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) - side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) - side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id)) - return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(side_effect_keys, side_effect_value, side_effect_close)) + keys, value, close = self._get_side_effect_lambdas(request_id) + return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close)) + + def submit_async(self, bytecode): + request_id = str(uuid.uuid4()) + future_traversers = self.submit_traversal_bytecode(request_id, bytecode) + keys, value, close = self._get_side_effect_lambdas(request_id) + side_effects = RemoteTraversalSideEffects(keys, value, close) + return RemoteTraversal(future_traversers, side_effects) @gen.coroutine def submit_traversal_bytecode(self, request_id, bytecode): @@ -135,6 +142,12 @@ class DriverRemoteConnection(RemoteConnection): result = yield self._execute_message(message) raise gen.Return(result) + def _get_side_effect_lambdas(self, request_id): + side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) + side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) + side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id)) + return side_effect_keys, side_effect_value, side_effect_close + @gen.coroutine def _execute_message(self, send_message): send_message = b"".join([b"\x21", http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py index 46fb760..93c92b7 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py @@ -95,3 +95,10 @@ class RemoteStrategy(TraversalStrategy): remote_traversal = self.remote_connection.submit(traversal.bytecode) traversal.side_effects = remote_traversal.side_effects traversal.traversers = remote_traversal.traversers + + def apply_async(self, traversal): + if traversal.traversers is None: + remote_traversal = self.remote_connection.submit_async( + traversal.bytecode) + traversal.side_effects = remote_traversal.side_effects + traversal.traversers = remote_traversal.traversers http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/gremlin_python/process/traversal.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py index a7b6118..2c2db59 100644 --- a/gremlin-python/src/main/jython/gremlin_python/process/traversal.py +++ b/gremlin-python/src/main/jython/gremlin_python/process/traversal.py @@ -77,6 +77,28 @@ class Traversal(object): except StopIteration: return tempList tempList.append(temp) return tempList + def promise(self, cb=None): + self.traversal_strategies.apply_async_strategies(self) + future_traversers = self.traversers + future = type(future_traversers)() + def process(f): + try: + traversers = f.result() + except Exception as e: + future.set_exception(e) + else: + self.traversers = iter(traversers) + if cb: + try: + result = cb(self) + except Exception as e: + future.set_exception(e) + else: + future.set_result(result) + else: + future.set_result(self) + future_traversers.add_done_callback(process) + return future Barrier = Enum('Barrier', 'normSack') @@ -286,6 +308,9 @@ class TraversalStrategies(object): def apply_strategies(self, traversal): for traversal_strategy in self.traversal_strategies: traversal_strategy.apply(traversal) + def apply_async_strategies(self, traversal): + for traversal_strategy in self.traversal_strategies: + traversal_strategy.apply_async(traversal) def __repr__(self): return str(self.traversal_strategies) @@ -296,6 +321,8 @@ class TraversalStrategy(object): self.configuration = {} if configuration is None else configuration def apply(self, traversal): return + def apply_async(self, traversal): + return def __eq__(self, other): return isinstance(other, self.__class__) def __hash__(self): @@ -379,4 +406,3 @@ class Binding(object): return hash(self.key) + hash(self.value) def __repr__(self): return "binding[" + self.key + "=" + str(self.value) + "]" - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b0b93308/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py index 6b057d5..c9e64c5 100644 --- a/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py +++ b/gremlin-python/src/main/jython/tests/driver/test_driver_remote_connection.py @@ -24,6 +24,8 @@ from unittest import TestCase import pytest +from tornado import ioloop, gen + from gremlin_python import statics from gremlin_python.statics import long from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection @@ -193,6 +195,63 @@ class TestDriverRemoteConnection(TestCase): t.side_effects.value_lambda('b') connection.close() + def test_promise(self): + loop = ioloop.IOLoop.current() + connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') + g = Graph().traversal().withRemote(connection) + + @gen.coroutine + def go(): + future_traversal = g.V().promise(lambda x: x.toList()) + assert not future_traversal.done() + resp = yield future_traversal + assert future_traversal.done() + assert len(resp) == 6 + count = yield g.V().count().promise(lambda x: x.next()) + assert count == 6 + + loop.run_sync(go) + + def test_promise_side_effects(self): + loop = ioloop.IOLoop.current() + connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') + g = Graph().traversal().withRemote(connection) + + # Side effects are problematic in coroutines. + # Because they are designed to be synchronous (calling `run_sync`) + # they result in an error if called from a coroutine because + # the event loop is already running + @gen.coroutine + def go(): + traversal = yield g.V().aggregate('a').promise() + # Trying to get side effect keys throws error - BAD + with pytest.raises(RuntimeError): + keys = traversal.side_effects.keys() + # IOLoop is now hosed. + + loop.run_sync(go) + + # Get a new IOLoop - this should happen for each test case. + connection.close() + ioloop.IOLoop.clear_instance() + loop.close() + loop = ioloop.IOLoop() + loop.make_current() + + connection = DriverRemoteConnection('ws://localhost:45940/gremlin', 'g') + g = Graph().traversal().withRemote(connection) + + # If we return the traversal though, we can use side effects per usual. + @gen.coroutine + def go(): + traversal = yield g.V().aggregate('a').promise() + raise gen.Return(traversal) # Weird legacy Python compatible idiom + + # See, normal side effects. + traversal = loop.run_sync(go) + a, = traversal.side_effects.keys() + assert a == 'a' + if __name__ == '__main__': test = False
