kenhuuu commented on code in PR #3315:
URL: https://github.com/apache/tinkerpop/pull/3315#discussion_r2933345615


##########
gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs:
##########
@@ -22,306 +22,151 @@
 #endregion
 
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
-using System.Text;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
 using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 using Gremlin.Net.Process;
+using Gremlin.Net.Structure.IO;
 
 namespace Gremlin.Net.Driver
 {
-    internal interface IResponseHandlerForSingleRequestMessage
+    /// <summary>
+    ///     HTTP-based connection that sends requests via HTTP POST to Gremlin 
Server.
+    /// </summary>
+    internal class Connection : IDisposable
     {
-        void HandleReceived(ResponseMessage<List<object>> received);
-        void Finalize(Dictionary<string, object> statusAttributes);
-        void HandleFailure(Exception objException);
-        void Cancel();
-    }
+        private const string GraphBinaryMimeType = 
SerializationTokens.GraphBinary4MimeType;
 
-    internal class Connection : IConnection
-    {
-        private readonly IMessageSerializer _messageSerializer;
+        private readonly HttpClient _httpClient;
         private readonly Uri _uri;
-        private readonly IWebSocketConnection _webSocketConnection;
-        private readonly string? _username;
-        private readonly string? _password;
-        private readonly string? _sessionId;
-        private readonly bool _sessionEnabled;
-
-        private readonly ConcurrentQueue<(RequestMessage msg, 
CancellationToken cancellationToken)> _writeQueue = new();
-
-        private readonly ConcurrentDictionary<Guid, 
IResponseHandlerForSingleRequestMessage> _callbackByRequestId =
-            new();
+        private readonly IMessageSerializer _serializer;
+        private readonly bool _enableCompression;
+        private readonly bool _enableUserAgentOnConnect;
+        private readonly bool _bulkResults;
+        // Interceptor slot reserved for future spec
+        // private readonly IReadOnlyList<Func<HttpRequestMessage, Task>> 
_interceptors;
 
-        private readonly ConcurrentDictionary<Guid, 
CancellationTokenRegistration> _cancellationByRequestId = new();
-        private int _connectionState = 0;
-        private int _writeInProgress = 0;
-        private const int Closed = 1;
-
-        public Connection(IWebSocketConnection webSocketConnection, Uri uri, 
string? username, string? password,
-            IMessageSerializer messageSerializer, string? sessionId)
+        public Connection(Uri uri, IMessageSerializer serializer,
+            ConnectionSettings settings)
         {
             _uri = uri;
-            _username = username;
-            _password = password;
-            _sessionId = sessionId;
-            if (!string.IsNullOrEmpty(sessionId))
-            {
-                _sessionEnabled = true;
-            }
-            _messageSerializer = messageSerializer;
-            _webSocketConnection = webSocketConnection;
-        }
-
-        public async Task ConnectAsync(CancellationToken cancellationToken)
-        {
-            await _webSocketConnection.ConnectAsync(_uri, 
cancellationToken).ConfigureAwait(false);
-            BeginReceiving();
-        }
-
-        public int NrRequestsInFlight => _callbackByRequestId.Count;
-
-        public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref 
_connectionState) != Closed;
+            _serializer = serializer;
+            _enableCompression = settings.EnableCompression;
+            _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+            _bulkResults = settings.BulkResults;
 
-        public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage, CancellationToken cancellationToken)
-        {
-            var receiver = new ResponseHandlerForSingleRequestMessage<T>();
-            _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
-
-            _cancellationByRequestId.GetOrAdd(requestMessage.RequestId, 
cancellationToken.Register(() =>
-            {
-                if (_callbackByRequestId.TryRemove(requestMessage.RequestId, 
out var responseHandler))
-                {
-                    responseHandler.Cancel();
-                }
-            }));
-            _writeQueue.Enqueue((requestMessage, cancellationToken));
-            BeginSendingMessages();
-            return receiver.Result;
-        }
-
-        private void BeginReceiving()
-        {
-            var state = Volatile.Read(ref _connectionState);
-            if (state == Closed) return;
-            ReceiveMessagesAsync().Forget();
-        }
-
-        private async Task ReceiveMessagesAsync()
-        {
-            while (true)
-            {
-                try
-                {
-                    var received = await 
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
-                    await HandleReceivedAsync(received).ConfigureAwait(false);
-                }
-                catch (Exception e)
-                {
-                    await 
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
-                    break;
-                }
-            }
+#if NET6_0_OR_GREATER
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = settings.IdleConnectionTimeout,
+                MaxConnectionsPerServer = settings.MaxConnectionsPerServer,
+                ConnectTimeout = settings.ConnectionTimeout,
+                KeepAlivePingTimeout = settings.KeepAliveInterval,
+            };
+            _httpClient = new HttpClient(handler);
+#else
+            _httpClient = new HttpClient();
+            _httpClient.Timeout = settings.ConnectionTimeout;
+#endif
         }
 
-        private async Task HandleReceivedAsync(byte[] received)
+        /// <summary>
+        ///     Constructor that accepts a pre-configured HttpClient (for 
testing).
+        /// </summary>
+        internal Connection(Uri uri, IMessageSerializer serializer,
+            ConnectionSettings settings, HttpClient httpClient)
         {
-            var receivedMsg = await 
_messageSerializer.DeserializeMessageAsync(received).ConfigureAwait(false);
-            if (receivedMsg == null)
-            {
-                throw new InvalidOperationException(
-                    "Received data deserialized into null object message. 
Cannot operate on it.");
-            }
-
-            try
-            {
-                HandleReceivedMessage(receivedMsg);
-            }
-            catch (Exception e)
-            {
-                if (receivedMsg!.RequestId != null)
-                {
-                    
if(_callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
responseHandler))
-                    {
-                        responseHandler?.HandleFailure(e);
-                        
-                    }
-                    if 
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
cancellation))
-                    {
-                        cancellation.Dispose();
-                    }
-                }
-            }
+            _uri = uri;
+            _serializer = serializer;
+            _enableCompression = settings.EnableCompression;
+            _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+            _bulkResults = settings.BulkResults;
+            _httpClient = httpClient;
         }
 
-        private void HandleReceivedMessage(ResponseMessage<List<object>> 
receivedMsg)
+        public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage,
+            CancellationToken cancellationToken = default)
         {
-            var status = receivedMsg.Status;
-            status.ThrowIfStatusIndicatesError();
-
-            if (status.Code == ResponseStatusCode.Authenticate)
-            {
-                Authenticate();
-                return;
-            }
+            var requestBytes = await 
_serializer.SerializeMessageAsync(requestMessage, cancellationToken)
+                .ConfigureAwait(false);
 
-            if (receivedMsg.RequestId == null) return;
+            using var content = new ByteArrayContent(requestBytes);
+            content.Headers.ContentType = new 
MediaTypeHeaderValue(GraphBinaryMimeType);
 
-            _callbackByRequestId.TryGetValue(receivedMsg.RequestId.Value, out 
var responseHandler);
-            if (status.Code != ResponseStatusCode.NoContent)
-            {
-                responseHandler?.HandleReceived(receivedMsg);
-            }
+            using var httpRequest = new HttpRequestMessage(HttpMethod.Post, 
_uri);
+            httpRequest.Content = content;
+            httpRequest.Headers.Accept.Add(new 
MediaTypeWithQualityHeaderValue(GraphBinaryMimeType));
 
-            if (status.Code == ResponseStatusCode.Success || status.Code == 
ResponseStatusCode.NoContent)
+            if (_enableCompression)
             {
-                if 
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
cancellation))
-                {
-                    cancellation.Dispose();
-                }
-                responseHandler?.Finalize(status.Attributes);
-                _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, 
out _);
+                httpRequest.Headers.AcceptEncoding.Add(new 
StringWithQualityHeaderValue("deflate"));
             }
-        }
-
-        private void Authenticate()
-        {
-            if (string.IsNullOrEmpty(_username) || 
string.IsNullOrEmpty(_password))
-                throw new InvalidOperationException(
-                    $"The Gremlin Server requires authentication, but no 
credentials are specified - username: {_username}, password: {_password}.");
-
-            var message = 
RequestMessage.Build(Tokens.OpsAuthentication).Processor(Tokens.ProcessorTraversal)
-                .AddArgument(Tokens.ArgsSasl, SaslArgument()).Create();
 
-            _writeQueue.Enqueue((message, CancellationToken.None));
-            BeginSendingMessages();
-        }
-
-        private string SaslArgument()
-        {
-            var auth = $"\0{_username}\0{_password}";
-            var authBytes = Encoding.UTF8.GetBytes(auth);
-            return Convert.ToBase64String(authBytes);
-        }
-
-        private void BeginSendingMessages()
-        {
-            if (Interlocked.CompareExchange(ref _writeInProgress, 1, 0) != 0)
-                return;
-            SendMessagesFromQueueAsync().Forget();
-        }
-
-        private async Task SendMessagesFromQueueAsync()
-        {
-            while (_writeQueue.TryDequeue(out var msg))
+            if (_enableUserAgentOnConnect)
             {
-                try
-                {
-                    await SendMessageAsync(msg.msg, 
msg.cancellationToken).ConfigureAwait(false);
-                }
-                catch (OperationCanceledException e) when 
(msg.cancellationToken == e.CancellationToken)
-                {
-                    // Send was cancelled for this message -> silently catch 
as we want to continue sending from this
-                    //  connection. The task responsible for submitting this 
message will be cancelled by the
-                    //  `ResponseHandlerForSingleRequestMessage`.
-                }
-                catch (Exception e)
-                {
-                    await 
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
-                    break;
-                }
+                httpRequest.Headers.TryAddWithoutValidation("User-Agent", 
Utils.UserAgent);
             }
-            Interlocked.CompareExchange(ref _writeInProgress, 0, 1);
 
-            // Since the loop ended and the write in progress was set to 0
-            // a new item could have been added, write queue can contain items 
at this time
-            if (!_writeQueue.IsEmpty && Interlocked.CompareExchange(ref 
_writeInProgress, 1, 0) == 0)
+            if (_bulkResults)
             {
-                await SendMessagesFromQueueAsync().ConfigureAwait(false);
+                httpRequest.Headers.Add("bulkResults", "true");
             }
-        }
 
-        private async Task CloseConnectionBecauseOfFailureAsync(Exception 
exception)
-        {
-            EmptyWriteQueue();
-            await CloseAsync().ConfigureAwait(false);
-            NotifyAboutConnectionFailure(exception);
-        }
+            // Future: apply interceptors here
 
-        private void EmptyWriteQueue()
-        {
-            while (_writeQueue.TryDequeue(out _))
-            {
-            }
-        }
+            using var response = await _httpClient.SendAsync(httpRequest, 
cancellationToken)
+                .ConfigureAwait(false);
 
-        private void NotifyAboutConnectionFailure(Exception exception)
-        {
-            foreach (var cb in _callbackByRequestId.Values)
+            // If the HTTP status indicates an error and the response is not 
GraphBinary
+            // (e.g. a proxy 502 or server 404 returning HTML/plain text), 
fail fast with
+            // a clear message instead of letting the deserializer throw a 
confusing parse error.
+            // When the Content-Type matches GraphBinary, fall through to 
normal deserialization
+            // so the status footer in the GB4 response can surface the 
application-level error.
+            if (!response.IsSuccessStatusCode &&
+                response.Content.Headers.ContentType?.MediaType != 
GraphBinaryMimeType)

Review Comment:
   I don't think this is actually correct. The server sometimes isn't able to 
respond with GraphBinary in some cases but it still tries to respond with a 
valid JSON that has a parseable error field. Its actually only if the response 
isn't in that valid JSON that you shouldn't attempt to parse.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to