meged tp32
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/54dad1d1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/54dad1d1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/54dad1d1 Branch: refs/heads/master Commit: 54dad1d1ccbba2542134a85e4696778c8a2ab5b0 Parents: d5162ff 77ce515 Author: davebshow <davebs...@gmail.com> Authored: Tue Feb 28 10:47:35 2017 -0500 Committer: davebshow <davebs...@gmail.com> Committed: Tue Feb 28 10:47:35 2017 -0500 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../src/reference/gremlin-applications.asciidoc | 74 +++++ .../upgrade/release-3.2.x-incubating.asciidoc | 31 ++- gremlin-python/pom.xml | 1 + .../python/TraversalSourceGenerator.groovy | 11 +- .../main/jython/gremlin_python/driver/client.py | 120 ++++++++ .../jython/gremlin_python/driver/connection.py | 78 ++++++ .../driver/driver_remote_connection.py | 271 ++++--------------- .../jython/gremlin_python/driver/protocol.py | 101 +++++++ .../gremlin_python/driver/remote_connection.py | 97 +++++-- .../jython/gremlin_python/driver/request.py | 25 ++ .../jython/gremlin_python/driver/resultset.py | 91 +++++++ .../jython/gremlin_python/driver/serializer.py | 129 +++++++++ .../gremlin_python/driver/tornado/__init__.py | 18 ++ .../gremlin_python/driver/tornado/transport.py | 47 ++++ .../jython/gremlin_python/driver/transport.py | 46 ++++ .../jython/gremlin_python/process/traversal.py | 11 +- gremlin-python/src/main/jython/setup.py | 22 +- .../src/main/jython/tests/conftest.py | 75 +++++ .../src/main/jython/tests/driver/test_client.py | 110 ++++++++ .../driver/test_driver_remote_connection.py | 148 +++------- .../test_driver_remote_connection_threaded.py | 83 +++--- .../jython/tests/structure/io/test_graphson.py | 21 +- 23 files changed, 1189 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/CHANGELOG.asciidoc ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/docs/src/reference/gremlin-applications.asciidoc ---------------------------------------------------------------------- diff --cc docs/src/reference/gremlin-applications.asciidoc index 4805272,8e0edc2..2d9a499 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@@ -778,7 -762,81 +778,81 @@@ The above code demonstrates using the ` what classes (from Titan in this case) to auto-register during serialization. Gremlin Server roughly uses this same approach when it configures it's serializers, so using this same model will ensure compatibility when making requests. + [[connecting-via-python]] + Connecting via Python + ~~~~~~~~~~~~~~~~~~~~~ + + [source,python] + ---- + pip install gremlinpython + ---- + + TinkerPop3 also includes a client for Python-based applications. It is referred to as Gremlin-Python Driver. + The `Client` class implementation/interface is based on the Java Driver, with some restrictions. Most notably, + Gremlin-Python does not yet implement the `Cluster` class. Instead, `Client` is instantiated directly. + Usage is as follows: + + [source,python] + ---- + from gremlin_python.driver import client <1> + client = client.Client('ws://localhost:8182/gremlin', 'g') <2> + ---- + + <1> Import the Gremlin-Python `client` module. + <2> Opens a reference to `localhost` - note that there are various configuration options that can be passed + to the `Client` object upon instantiation as keyword arguments. + + Once a `Client` instance is ready, it is possible to issue some Gremlin: + + [source,python] + ---- + result_set = client.submit("[1,2,3,4]") <1> + future_results = result_set.all() <2> + results = future_results.result() <3> + assert results == [1, 2, 3, 4] <4> + + future_result_set = client.submitAsync("[1,2,3,4]") <5> + result_set = future_result_set.result() <6> + result = result_set.one() <7> + assert results == [1, 2, 3, 4] <8> + assert result_set.done.done() <9> + + client.close() <10> + ---- + + <1> Submit a script that simply returns a `List` of integers. This method blocks until the request is written to + the server and a `ResultSet` is constructed. + <2> Even though the `ResultSet` is constructed, it does not mean that the server has sent back the results (or even + evaluated the script potentially). The `ResultSet` is just a holder that is awaiting the results from the server. The `all` method + returns a `concurrent.futures.Future` that resolves to a list when it is complete. + <3> Block until the the script is evaluated and results are sent back by the server. + <4> Verify the result. + <5> Submit the same script to the server but don't block. + <6> Wait until request is written to the server and `ResultSet` is constructed. + <7> Read a single result off the result stream. + <8> Again, verify the result. + <9> Verify that the all results have been read and stream is closed. + <10> Close client and underlying pool connections. + + Configuration + ^^^^^^^^^^^^^ + + The following table describes the various configuration options for the Gremlin-Python Driver. They + can be passed to the `Client` instance as keyword arguments: + + [width="100%",cols="3,10,^2",options="header"] + |========================================================= + |Key |Description |Default + |protocol_factory |A callable that returns an instance of `AbstractBaseProtocol`. |`gremlin_python.driver.protocol.GremlinServerWSProtocol` + |transport_factory |A callable that returns an instance of `AbstractBaseTransport`. |`gremlin_python.driver.tornado.transport.TornadoTransport` + |pool_size |The number of connections used by the pool. |4 + |max_workers |Maximum number of worker threads. |Number of CPUs * 5 + |message_serializer |The message serializer implementation.|`gremlin_python.driver.serializer.GraphSONMessageSerializer` + |password |The password to submit on requests that require authentication. |"" + |username |The username to submit on requests that require authentication. |"" + |========================================================= + -Connecting via REST +Connecting via HTTP ~~~~~~~~~~~~~~~~~~~ image:gremlin-rexster.png[width=225,float=left] While the default behavior for Gremlin Server is to provide a http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/docs/src/upgrade/release-3.2.x-incubating.asciidoc ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/gremlin-python/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py ---------------------------------------------------------------------- diff --cc gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index 7bc792f,1f08e2b..ea4adfd --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@@ -36,216 -26,50 +26,49 @@@ __author__ = 'David M. Brown (davebshow class DriverRemoteConnection(RemoteConnection): - def __init__(self, url, traversal_source, username="", password="", loop=None, graphson_reader=None, graphson_writer=None): - super(DriverRemoteConnection, self).__init__(url, traversal_source) - self._url = url - self._username = username - self._password = password - if loop is None: - loop = ioloop.IOLoop.current() - self._loop = loop - self._websocket = self._loop.run_sync(lambda: websocket.websocket_connect(self.url)) - self._graphson_reader = graphson_reader or GraphSONReader() - self._graphson_writer = graphson_writer or GraphSONWriter() - - def submit(self, bytecode): - ''' - :param bytecode: the bytecode of a traversal to submit to the RemoteConnection - :return: a RemoteTraversal with RemoteTraversalSideEffects - ''' - request_id = str(uuid.uuid4()) - traversers = self._loop.run_sync(lambda: self.submit_traversal_bytecode(request_id, bytecode)) - keys, value, close = self._get_side_effect_lambdas(request_id) - return RemoteTraversal(iter(traversers), RemoteTraversalSideEffects(keys, value, close, self._loop)) - - def submit_async(self, bytecode): - request_id = str(uuid.uuid4()) - future_traversers = self.submit_traversal_bytecode(request_id, bytecode) - keys, value, close = self._get_side_effect_lambdas(request_id) - side_effects = RemoteTraversalSideEffects(keys, value, close, self._loop) - return RemoteTraversal(future_traversers, side_effects) - - @gen.coroutine - def submit_traversal_bytecode(self, request_id, bytecode): - message = { - "requestId": { - "@type": "g:UUID", - "@value": request_id - }, - "op": "bytecode", - "processor": "traversal", - "args": { - "gremlin": self._graphson_writer.toDict(bytecode), - "aliases": {"g": self.traversal_source} - } - } - traversers = yield self._execute_message(message) - raise gen.Return(traversers) - - @gen.coroutine - def submit_sideEffect_keys(self, request_id): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "keys", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - } - } - } - keys = yield self._execute_message(message) - raise gen.Return(set(keys)) - - @gen.coroutine - def submit_sideEffect_value(self, request_id, key): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "gather", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - }, - "sideEffectKey": key, - "aliases": {"g": self.traversal_source} - } - } - try: - value = yield self._execute_message(message) - except: - raise KeyError(key) - raise gen.Return(value) - - @gen.coroutine - def submit_sideEffect_close(self, request_id): - message = { - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "close", - "processor": "traversal", - "args": { - "sideEffect": { - "@type": "g:UUID", - "@value": request_id - } - } - } - result = yield self._execute_message(message) - raise gen.Return(result) - def _get_side_effect_lambdas(self, request_id): - side_effect_keys = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_keys(request_id)) - side_effect_value = lambda key: self._loop.run_sync(lambda: self.submit_sideEffect_value(request_id, key)) - side_effect_close = lambda: self._loop.run_sync(lambda: self.submit_sideEffect_close(request_id)) - return side_effect_keys, side_effect_value, side_effect_close - - @gen.coroutine - def _execute_message(self, send_message): - send_message = b"".join([b"\x21", - b"application/vnd.gremlin-v3.0+json", - json.dumps(send_message, separators=(',', ':')).encode("utf-8")]) - if self._websocket.protocol is None: - self._websocket = yield websocket.websocket_connect(self.url) - self._websocket.write_message(send_message, binary=True) - response = Response(self._websocket, self._username, self._password, self._graphson_reader) - results = None - while True: - recv_message = yield response.receive() - if recv_message is None: - break - aggregateTo = recv_message[0] - # on first message, get the right result data structure - if None == results: - if "list" == aggregateTo: - results = [] - elif "set" == aggregateTo: - results = set() - elif aggregateTo in ["map", "bulkset"]: - results = {} - elif "none" == aggregateTo: - results = None - else: - results = [] - - # if there is no update to a structure, then the item is the result - if results is None: - results = recv_message[1][0] - # updating a map is different than a list or a set - elif isinstance(results, dict): - if "map" == aggregateTo: - for item in recv_message[1]: - results.update(item) - else: - for item in recv_message[1]: - results[item.object] = item.bulk - # flat add list to result list - else: - results += recv_message[1] - raise gen.Return([] if None == results else results) + def __init__(self, url, traversal_source, protocol_factory=None, + transport_factory=None, pool_size=None, max_workers=None, + username="", password="", message_serializer=None, + graphson_reader=None, graphson_writer=None): + if message_serializer is None: + message_serializer = serializer.GraphSONMessageSerializer( + reader=graphson_reader, + writer=graphson_writer) + self._client = client.Client(url, traversal_source, + protocol_factory=protocol_factory, + transport_factory=transport_factory, + pool_size=pool_size, + max_workers=max_workers, + message_serializer=message_serializer, + username=username, + password=password) + self._url = self._client._url + self._traversal_source = self._client._traversal_source def close(self): - self._websocket.close() - + self._client.close() - class Response: - def __init__(self, websocket, username, password, graphson_reader): - self._websocket = websocket - self._username = username - self._password = password - self._closed = False - self._graphson_reader = graphson_reader - - @gen.coroutine - def receive(self): - if self._closed: - return - recv_message = yield self._websocket.read_message() - recv_message = json.loads(recv_message.decode('utf-8')) - status_code = recv_message["status"]["code"] - aggregateTo = recv_message["result"]["meta"].get("aggregateTo", "list") - - # authentification required then - if status_code == 407: - self._websocket.write_message( - b"".join([b"\x21", - b"application/vnd.gremlin-v3.0+json", - json.dumps({ - "requestId": { - "@type": "g:UUID", - "@value": str(uuid.uuid4()) - }, - "op": "authentication", - "processor": "traversal", - "args": { - "sasl": base64.b64encode( - b"".join([b"\x00", self._username.encode("utf-8"), - b"\x00", self._password.encode("utf-8")])).decode() - } - }, separators=(',', ':')).encode("utf-8")]), binary=True) - results = yield self.receive() - raise gen.Return(results) - elif status_code == 204: - self._closed = True - return - elif status_code in [200, 206]: - results = [] - for item in recv_message["result"]["data"]: - results.append(self._graphson_reader.toObject(item)) - if status_code == 200: - self._closed = True - raise gen.Return((aggregateTo, results)) - else: - self._closed = True - raise GremlinServerError( - "{0}: {1}".format(status_code, recv_message["status"]["message"])) + def submit(self, bytecode): + result_set = self._client.submit(bytecode) + results = result_set.all().result() + side_effects = RemoteTraversalSideEffects(result_set.request_id, + self._client) + return RemoteTraversal(iter(results), side_effects) + + def submitAsync(self, bytecode): + future = Future() + future_result_set = self._client.submitAsync(bytecode) + + def cb(f): + try: + result_set = f.result() + except Exception as e: + future.set_exception(e) + else: + results = result_set.all().result() + side_effects = RemoteTraversalSideEffects(result_set.request_id, + self._client) + future.set_result(RemoteTraversal(iter(results), side_effects)) - + future_result_set.add_done_callback(cb) + return future http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/gremlin-python/src/main/jython/gremlin_python/process/traversal.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/54dad1d1/gremlin-python/src/main/jython/tests/structure/io/test_graphson.py ----------------------------------------------------------------------