jorgebay commented on a change in pull request #1016: WIP: Add request
pipelining and ConnectionPool sizes TINKERPOP-1775 and TINKERPOP-1774
URL: https://github.com/apache/tinkerpop/pull/1016#discussion_r242882720
##########
File path: gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs
##########
@@ -55,96 +71,82 @@ internal class Connection : IConnection
_webSocketConnection = new
WebSocketConnection(webSocketConfiguration);
}
- public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage
requestMessage)
- {
- await SendAsync(requestMessage).ConfigureAwait(false);
- return await ReceiveAsync<T>().ConfigureAwait(false);
- }
-
public async Task ConnectAsync()
{
await
_webSocketConnection.ConnectAsync(_uri).ConfigureAwait(false);
+ ReceiveNext();
}
- public async Task CloseAsync()
- {
- await _webSocketConnection.CloseAsync().ConfigureAwait(false);
- }
+ public int NrRequestsInFlight => _callbackByRequestId.Count;
public bool IsOpen => _webSocketConnection.IsOpen;
- private async Task SendAsync(RequestMessage message)
+ public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
{
- var graphsonMsg = _graphSONWriter.WriteObject(message);
- var serializedMsg =
_messageSerializer.SerializeMessage(graphsonMsg);
- await
_webSocketConnection.SendMessageAsync(serializedMsg).ConfigureAwait(false);
+ var receiver = new
ResponseHandlerForSingleRequestMessage<T>(_graphSONReader);
+ _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
+ _writeQueue.Enqueue(requestMessage);
+ SendNextMessage();
+ return receiver.Result;
}
- private async Task<ResultSet<T>> ReceiveAsync<T>()
+ private void ReceiveNext()
{
- ResultSet<T> resultSet = null;
- Dictionary<string, object> statusAttributes = null;
- ResponseStatus status;
- IAggregator aggregator = null;
- var isAggregatingSideEffects = false;
- var result = new List<T>();
- do
- {
- var received = await
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
- var receivedMsg =
_messageSerializer.DeserializeMessage<ResponseMessage<JToken>>(received);
-
- status = receivedMsg.Status;
- status.ThrowIfStatusIndicatesError();
-
- if (status.Code == ResponseStatusCode.Authenticate)
- {
- await AuthenticateAsync().ConfigureAwait(false);
- }
- else if (status.Code != ResponseStatusCode.NoContent)
+ var state = Interlocked.CompareExchange(ref _connectionState, 0,
0);
Review comment:
This check is nice to have, but given that the connection might switch to
closed immediately after this call, I think there is no point in doing a CAS
operation here.
A `Volatile.Read()` would do it and it's order of magnitude cheaper :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services