kenhuuu commented on code in PR #3315:
URL: https://github.com/apache/tinkerpop/pull/3315#discussion_r2933345615
##########
gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs:
##########
@@ -22,306 +22,151 @@
#endregion
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Text;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Gremlin.Net.Driver.Messages;
using Gremlin.Net.Process;
+using Gremlin.Net.Structure.IO;
namespace Gremlin.Net.Driver
{
- internal interface IResponseHandlerForSingleRequestMessage
+ /// <summary>
+ /// HTTP-based connection that sends requests via HTTP POST to Gremlin
Server.
+ /// </summary>
+ internal class Connection : IDisposable
{
- void HandleReceived(ResponseMessage<List<object>> received);
- void Finalize(Dictionary<string, object> statusAttributes);
- void HandleFailure(Exception objException);
- void Cancel();
- }
+ private const string GraphBinaryMimeType =
SerializationTokens.GraphBinary4MimeType;
- internal class Connection : IConnection
- {
- private readonly IMessageSerializer _messageSerializer;
+ private readonly HttpClient _httpClient;
private readonly Uri _uri;
- private readonly IWebSocketConnection _webSocketConnection;
- private readonly string? _username;
- private readonly string? _password;
- private readonly string? _sessionId;
- private readonly bool _sessionEnabled;
-
- private readonly ConcurrentQueue<(RequestMessage msg,
CancellationToken cancellationToken)> _writeQueue = new();
-
- private readonly ConcurrentDictionary<Guid,
IResponseHandlerForSingleRequestMessage> _callbackByRequestId =
- new();
+ private readonly IMessageSerializer _serializer;
+ private readonly bool _enableCompression;
+ private readonly bool _enableUserAgentOnConnect;
+ private readonly bool _bulkResults;
+ // Interceptor slot reserved for future spec
+ // private readonly IReadOnlyList<Func<HttpRequestMessage, Task>>
_interceptors;
- private readonly ConcurrentDictionary<Guid,
CancellationTokenRegistration> _cancellationByRequestId = new();
- private int _connectionState = 0;
- private int _writeInProgress = 0;
- private const int Closed = 1;
-
- public Connection(IWebSocketConnection webSocketConnection, Uri uri,
string? username, string? password,
- IMessageSerializer messageSerializer, string? sessionId)
+ public Connection(Uri uri, IMessageSerializer serializer,
+ ConnectionSettings settings)
{
_uri = uri;
- _username = username;
- _password = password;
- _sessionId = sessionId;
- if (!string.IsNullOrEmpty(sessionId))
- {
- _sessionEnabled = true;
- }
- _messageSerializer = messageSerializer;
- _webSocketConnection = webSocketConnection;
- }
-
- public async Task ConnectAsync(CancellationToken cancellationToken)
- {
- await _webSocketConnection.ConnectAsync(_uri,
cancellationToken).ConfigureAwait(false);
- BeginReceiving();
- }
-
- public int NrRequestsInFlight => _callbackByRequestId.Count;
-
- public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref
_connectionState) != Closed;
+ _serializer = serializer;
+ _enableCompression = settings.EnableCompression;
+ _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+ _bulkResults = settings.BulkResults;
- public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage
requestMessage, CancellationToken cancellationToken)
- {
- var receiver = new ResponseHandlerForSingleRequestMessage<T>();
- _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
-
- _cancellationByRequestId.GetOrAdd(requestMessage.RequestId,
cancellationToken.Register(() =>
- {
- if (_callbackByRequestId.TryRemove(requestMessage.RequestId,
out var responseHandler))
- {
- responseHandler.Cancel();
- }
- }));
- _writeQueue.Enqueue((requestMessage, cancellationToken));
- BeginSendingMessages();
- return receiver.Result;
- }
-
- private void BeginReceiving()
- {
- var state = Volatile.Read(ref _connectionState);
- if (state == Closed) return;
- ReceiveMessagesAsync().Forget();
- }
-
- private async Task ReceiveMessagesAsync()
- {
- while (true)
- {
- try
- {
- var received = await
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
- await HandleReceivedAsync(received).ConfigureAwait(false);
- }
- catch (Exception e)
- {
- await
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
- break;
- }
- }
+#if NET6_0_OR_GREATER
+ var handler = new SocketsHttpHandler
+ {
+ PooledConnectionIdleTimeout = settings.IdleConnectionTimeout,
+ MaxConnectionsPerServer = settings.MaxConnectionsPerServer,
+ ConnectTimeout = settings.ConnectionTimeout,
+ KeepAlivePingTimeout = settings.KeepAliveInterval,
+ };
+ _httpClient = new HttpClient(handler);
+#else
+ _httpClient = new HttpClient();
+ _httpClient.Timeout = settings.ConnectionTimeout;
+#endif
}
- private async Task HandleReceivedAsync(byte[] received)
+ /// <summary>
+ /// Constructor that accepts a pre-configured HttpClient (for
testing).
+ /// </summary>
+ internal Connection(Uri uri, IMessageSerializer serializer,
+ ConnectionSettings settings, HttpClient httpClient)
{
- var receivedMsg = await
_messageSerializer.DeserializeMessageAsync(received).ConfigureAwait(false);
- if (receivedMsg == null)
- {
- throw new InvalidOperationException(
- "Received data deserialized into null object message.
Cannot operate on it.");
- }
-
- try
- {
- HandleReceivedMessage(receivedMsg);
- }
- catch (Exception e)
- {
- if (receivedMsg!.RequestId != null)
- {
-
if(_callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
responseHandler))
- {
- responseHandler?.HandleFailure(e);
-
- }
- if
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
cancellation))
- {
- cancellation.Dispose();
- }
- }
- }
+ _uri = uri;
+ _serializer = serializer;
+ _enableCompression = settings.EnableCompression;
+ _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+ _bulkResults = settings.BulkResults;
+ _httpClient = httpClient;
}
- private void HandleReceivedMessage(ResponseMessage<List<object>>
receivedMsg)
+ public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage
requestMessage,
+ CancellationToken cancellationToken = default)
{
- var status = receivedMsg.Status;
- status.ThrowIfStatusIndicatesError();
-
- if (status.Code == ResponseStatusCode.Authenticate)
- {
- Authenticate();
- return;
- }
+ var requestBytes = await
_serializer.SerializeMessageAsync(requestMessage, cancellationToken)
+ .ConfigureAwait(false);
- if (receivedMsg.RequestId == null) return;
+ using var content = new ByteArrayContent(requestBytes);
+ content.Headers.ContentType = new
MediaTypeHeaderValue(GraphBinaryMimeType);
- _callbackByRequestId.TryGetValue(receivedMsg.RequestId.Value, out
var responseHandler);
- if (status.Code != ResponseStatusCode.NoContent)
- {
- responseHandler?.HandleReceived(receivedMsg);
- }
+ using var httpRequest = new HttpRequestMessage(HttpMethod.Post,
_uri);
+ httpRequest.Content = content;
+ httpRequest.Headers.Accept.Add(new
MediaTypeWithQualityHeaderValue(GraphBinaryMimeType));
- if (status.Code == ResponseStatusCode.Success || status.Code ==
ResponseStatusCode.NoContent)
+ if (_enableCompression)
{
- if
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var
cancellation))
- {
- cancellation.Dispose();
- }
- responseHandler?.Finalize(status.Attributes);
- _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value,
out _);
+ httpRequest.Headers.AcceptEncoding.Add(new
StringWithQualityHeaderValue("deflate"));
}
- }
-
- private void Authenticate()
- {
- if (string.IsNullOrEmpty(_username) ||
string.IsNullOrEmpty(_password))
- throw new InvalidOperationException(
- $"The Gremlin Server requires authentication, but no
credentials are specified - username: {_username}, password: {_password}.");
-
- var message =
RequestMessage.Build(Tokens.OpsAuthentication).Processor(Tokens.ProcessorTraversal)
- .AddArgument(Tokens.ArgsSasl, SaslArgument()).Create();
- _writeQueue.Enqueue((message, CancellationToken.None));
- BeginSendingMessages();
- }
-
- private string SaslArgument()
- {
- var auth = $"\0{_username}\0{_password}";
- var authBytes = Encoding.UTF8.GetBytes(auth);
- return Convert.ToBase64String(authBytes);
- }
-
- private void BeginSendingMessages()
- {
- if (Interlocked.CompareExchange(ref _writeInProgress, 1, 0) != 0)
- return;
- SendMessagesFromQueueAsync().Forget();
- }
-
- private async Task SendMessagesFromQueueAsync()
- {
- while (_writeQueue.TryDequeue(out var msg))
+ if (_enableUserAgentOnConnect)
{
- try
- {
- await SendMessageAsync(msg.msg,
msg.cancellationToken).ConfigureAwait(false);
- }
- catch (OperationCanceledException e) when
(msg.cancellationToken == e.CancellationToken)
- {
- // Send was cancelled for this message -> silently catch
as we want to continue sending from this
- // connection. The task responsible for submitting this
message will be cancelled by the
- // `ResponseHandlerForSingleRequestMessage`.
- }
- catch (Exception e)
- {
- await
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
- break;
- }
+ httpRequest.Headers.TryAddWithoutValidation("User-Agent",
Utils.UserAgent);
}
- Interlocked.CompareExchange(ref _writeInProgress, 0, 1);
- // Since the loop ended and the write in progress was set to 0
- // a new item could have been added, write queue can contain items
at this time
- if (!_writeQueue.IsEmpty && Interlocked.CompareExchange(ref
_writeInProgress, 1, 0) == 0)
+ if (_bulkResults)
{
- await SendMessagesFromQueueAsync().ConfigureAwait(false);
+ httpRequest.Headers.Add("bulkResults", "true");
}
- }
- private async Task CloseConnectionBecauseOfFailureAsync(Exception
exception)
- {
- EmptyWriteQueue();
- await CloseAsync().ConfigureAwait(false);
- NotifyAboutConnectionFailure(exception);
- }
+ // Future: apply interceptors here
- private void EmptyWriteQueue()
- {
- while (_writeQueue.TryDequeue(out _))
- {
- }
- }
+ using var response = await _httpClient.SendAsync(httpRequest,
cancellationToken)
+ .ConfigureAwait(false);
- private void NotifyAboutConnectionFailure(Exception exception)
- {
- foreach (var cb in _callbackByRequestId.Values)
+ // If the HTTP status indicates an error and the response is not
GraphBinary
+ // (e.g. a proxy 502 or server 404 returning HTML/plain text),
fail fast with
+ // a clear message instead of letting the deserializer throw a
confusing parse error.
+ // When the Content-Type matches GraphBinary, fall through to
normal deserialization
+ // so the status footer in the GB4 response can surface the
application-level error.
+ if (!response.IsSuccessStatusCode &&
+ response.Content.Headers.ContentType?.MediaType !=
GraphBinaryMimeType)
Review Comment:
I don't think this is actually correct. The server sometimes isn't able to
respond with GraphBinary in some cases but it still tries to respond with a
valid JSON that has a parseable error field. Its actually only if the response
isn't in that valid JSON that you shouldn't attempt to parse.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]