FlorianHockmann commented on a change in pull request #1016: Add request 
pipelining and a fixed ConnectionPool size TINKERPOP-1775 and TINKERPOP-1774
URL: https://github.com/apache/tinkerpop/pull/1016#discussion_r244577759
 
 

 ##########
 File path: gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
 ##########
 @@ -55,87 +71,86 @@ internal class Connection : IConnection
             _webSocketConnection = new 
WebSocketConnection(webSocketConfiguration);
         }
 
-        public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage)
-        {
-            await SendAsync(requestMessage).ConfigureAwait(false);
-            return await ReceiveAsync<T>().ConfigureAwait(false);
-        }
-
         public async Task ConnectAsync()
         {
             await 
_webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+            BeginReceiving();
         }
 
-        public async Task CloseAsync()
-        {
-            await _webSocketConnection.CloseAsync().ConfigureAwait(false);
-        }
+        public int NrRequestsInFlight => _callbackByRequestId.Count;
 
         public bool IsOpen => _webSocketConnection.IsOpen;
 
-        private async Task SendAsync(RequestMessage message)
+        public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
         {
-            var graphsonMsg = _graphSONWriter.WriteObject(message);
-            var serializedMsg = 
_messageSerializer.SerializeMessage(graphsonMsg);
-            await 
_webSocketConnection.SendMessageAsync(serializedMsg).ConfigureAwait(false);
+            var receiver = new 
ResponseHandlerForSingleRequestMessage<T>(_graphSONReader);
+            _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
+            _writeQueue.Enqueue(requestMessage);
+            BeginSendingMessages();
+            return receiver.Result;
         }
 
-        private async Task<ResultSet<T>> ReceiveAsync<T>()
+        private void BeginReceiving()
         {
-            ResultSet<T> resultSet = null;
-            Dictionary<string, object> statusAttributes = null;
-            ResponseStatus status;
-            IAggregator aggregator = null;
-            var isAggregatingSideEffects = false;
-            var result = new List<T>();
-            do
-            {
-                var received = await 
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
-                var receivedMsg = 
_messageSerializer.DeserializeMessage<ResponseMessage<JToken>>(received);
-
-                status = receivedMsg.Status;
-                status.ThrowIfStatusIndicatesError();
+            var state = Volatile.Read(ref _connectionState);
+            if (state == Closed) return;
+            ReceiveMessagesAsync().Forget();
+        }
 
-                if (status.Code == ResponseStatusCode.Authenticate)
+        private async Task ReceiveMessagesAsync()
+        {
+            while (true)
+            {
+                try
                 {
-                    await AuthenticateAsync().ConfigureAwait(false);
+                    var received = await 
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
+                    Parse(received);
                 }
-                else if (status.Code != ResponseStatusCode.NoContent)
+                catch (Exception e)
                 {
-                    var receivedData = typeof(T) == typeof(JToken)
-                        ? new[] { receivedMsg.Result.Data }
-                        : _graphSONReader.ToObject(receivedMsg.Result.Data);
-
-                    foreach (var d in receivedData)
-                        if 
(receivedMsg.Result.Meta.ContainsKey(Tokens.ArgsSideEffectKey))
-                        {
-                            if (aggregator == null)
-                                aggregator =
-                                    new AggregatorFactory().GetAggregatorFor(
-                                        (string) 
receivedMsg.Result.Meta[Tokens.ArgsAggregateTo]);
-                            aggregator.Add(d);
-                            isAggregatingSideEffects = true;
-                        }
-                        else
-                        {
-                            result.Add(d);
-                        }
+                    await 
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
+                    break;
                 }
+            }
+        }
 
-                if (status.Code == ResponseStatusCode.Success || status.Code 
== ResponseStatusCode.NoContent)
-                {
-                    statusAttributes = receivedMsg.Status.Attributes;
-                }
+        private void Parse(byte[] received)
+        {
+            var receivedMsg = 
_messageSerializer.DeserializeMessage<ResponseMessage<JToken>>(received);
+            
+            try
+            {
+                TryParseResponseMessage(receivedMsg);
+            }
+            catch (Exception e)
+            {
+                _callbackByRequestId[receivedMsg.RequestId].HandleFailure(e);
 
 Review comment:
   Oh yes, kind of obvious. I just pushed a commit with this change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to