kenhuuu commented on code in PR #3315:
URL: https://github.com/apache/tinkerpop/pull/3315#discussion_r2933360284


##########
gremlin-dotnet/src/Gremlin.Net/Driver/Connection.cs:
##########
@@ -22,306 +22,151 @@
 #endregion
 
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
-using System.Text;
+using System.IO;
+using System.IO.Compression;
+using System.Linq;
+using System.Net.Http;
+using System.Net.Http.Headers;
 using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Messages;
 using Gremlin.Net.Process;
+using Gremlin.Net.Structure.IO;
 
 namespace Gremlin.Net.Driver
 {
-    internal interface IResponseHandlerForSingleRequestMessage
+    /// <summary>
+    ///     HTTP-based connection that sends requests via HTTP POST to Gremlin 
Server.
+    /// </summary>
+    internal class Connection : IDisposable
     {
-        void HandleReceived(ResponseMessage<List<object>> received);
-        void Finalize(Dictionary<string, object> statusAttributes);
-        void HandleFailure(Exception objException);
-        void Cancel();
-    }
+        private const string GraphBinaryMimeType = 
SerializationTokens.GraphBinary4MimeType;
 
-    internal class Connection : IConnection
-    {
-        private readonly IMessageSerializer _messageSerializer;
+        private readonly HttpClient _httpClient;
         private readonly Uri _uri;
-        private readonly IWebSocketConnection _webSocketConnection;
-        private readonly string? _username;
-        private readonly string? _password;
-        private readonly string? _sessionId;
-        private readonly bool _sessionEnabled;
-
-        private readonly ConcurrentQueue<(RequestMessage msg, 
CancellationToken cancellationToken)> _writeQueue = new();
-
-        private readonly ConcurrentDictionary<Guid, 
IResponseHandlerForSingleRequestMessage> _callbackByRequestId =
-            new();
+        private readonly IMessageSerializer _serializer;
+        private readonly bool _enableCompression;
+        private readonly bool _enableUserAgentOnConnect;
+        private readonly bool _bulkResults;
+        // Interceptor slot reserved for future spec
+        // private readonly IReadOnlyList<Func<HttpRequestMessage, Task>> 
_interceptors;
 
-        private readonly ConcurrentDictionary<Guid, 
CancellationTokenRegistration> _cancellationByRequestId = new();
-        private int _connectionState = 0;
-        private int _writeInProgress = 0;
-        private const int Closed = 1;
-
-        public Connection(IWebSocketConnection webSocketConnection, Uri uri, 
string? username, string? password,
-            IMessageSerializer messageSerializer, string? sessionId)
+        public Connection(Uri uri, IMessageSerializer serializer,
+            ConnectionSettings settings)
         {
             _uri = uri;
-            _username = username;
-            _password = password;
-            _sessionId = sessionId;
-            if (!string.IsNullOrEmpty(sessionId))
-            {
-                _sessionEnabled = true;
-            }
-            _messageSerializer = messageSerializer;
-            _webSocketConnection = webSocketConnection;
-        }
-
-        public async Task ConnectAsync(CancellationToken cancellationToken)
-        {
-            await _webSocketConnection.ConnectAsync(_uri, 
cancellationToken).ConfigureAwait(false);
-            BeginReceiving();
-        }
-
-        public int NrRequestsInFlight => _callbackByRequestId.Count;
-
-        public bool IsOpen => _webSocketConnection.IsOpen && Volatile.Read(ref 
_connectionState) != Closed;
+            _serializer = serializer;
+            _enableCompression = settings.EnableCompression;
+            _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+            _bulkResults = settings.BulkResults;
 
-        public Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage, CancellationToken cancellationToken)
-        {
-            var receiver = new ResponseHandlerForSingleRequestMessage<T>();
-            _callbackByRequestId.GetOrAdd(requestMessage.RequestId, receiver);
-
-            _cancellationByRequestId.GetOrAdd(requestMessage.RequestId, 
cancellationToken.Register(() =>
-            {
-                if (_callbackByRequestId.TryRemove(requestMessage.RequestId, 
out var responseHandler))
-                {
-                    responseHandler.Cancel();
-                }
-            }));
-            _writeQueue.Enqueue((requestMessage, cancellationToken));
-            BeginSendingMessages();
-            return receiver.Result;
-        }
-
-        private void BeginReceiving()
-        {
-            var state = Volatile.Read(ref _connectionState);
-            if (state == Closed) return;
-            ReceiveMessagesAsync().Forget();
-        }
-
-        private async Task ReceiveMessagesAsync()
-        {
-            while (true)
-            {
-                try
-                {
-                    var received = await 
_webSocketConnection.ReceiveMessageAsync().ConfigureAwait(false);
-                    await HandleReceivedAsync(received).ConfigureAwait(false);
-                }
-                catch (Exception e)
-                {
-                    await 
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
-                    break;
-                }
-            }
+#if NET6_0_OR_GREATER
+            var handler = new SocketsHttpHandler
+            {
+                PooledConnectionIdleTimeout = settings.IdleConnectionTimeout,
+                MaxConnectionsPerServer = settings.MaxConnectionsPerServer,
+                ConnectTimeout = settings.ConnectionTimeout,
+                KeepAlivePingTimeout = settings.KeepAliveInterval,
+            };
+            _httpClient = new HttpClient(handler);
+#else
+            _httpClient = new HttpClient();
+            _httpClient.Timeout = settings.ConnectionTimeout;
+#endif
         }
 
-        private async Task HandleReceivedAsync(byte[] received)
+        /// <summary>
+        ///     Constructor that accepts a pre-configured HttpClient (for 
testing).
+        /// </summary>
+        internal Connection(Uri uri, IMessageSerializer serializer,
+            ConnectionSettings settings, HttpClient httpClient)
         {
-            var receivedMsg = await 
_messageSerializer.DeserializeMessageAsync(received).ConfigureAwait(false);
-            if (receivedMsg == null)
-            {
-                throw new InvalidOperationException(
-                    "Received data deserialized into null object message. 
Cannot operate on it.");
-            }
-
-            try
-            {
-                HandleReceivedMessage(receivedMsg);
-            }
-            catch (Exception e)
-            {
-                if (receivedMsg!.RequestId != null)
-                {
-                    
if(_callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
responseHandler))
-                    {
-                        responseHandler?.HandleFailure(e);
-                        
-                    }
-                    if 
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
cancellation))
-                    {
-                        cancellation.Dispose();
-                    }
-                }
-            }
+            _uri = uri;
+            _serializer = serializer;
+            _enableCompression = settings.EnableCompression;
+            _enableUserAgentOnConnect = settings.EnableUserAgentOnConnect;
+            _bulkResults = settings.BulkResults;
+            _httpClient = httpClient;
         }
 
-        private void HandleReceivedMessage(ResponseMessage<List<object>> 
receivedMsg)
+        public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage 
requestMessage,
+            CancellationToken cancellationToken = default)
         {
-            var status = receivedMsg.Status;
-            status.ThrowIfStatusIndicatesError();
-
-            if (status.Code == ResponseStatusCode.Authenticate)
-            {
-                Authenticate();
-                return;
-            }
+            var requestBytes = await 
_serializer.SerializeMessageAsync(requestMessage, cancellationToken)
+                .ConfigureAwait(false);
 
-            if (receivedMsg.RequestId == null) return;
+            using var content = new ByteArrayContent(requestBytes);
+            content.Headers.ContentType = new 
MediaTypeHeaderValue(GraphBinaryMimeType);
 
-            _callbackByRequestId.TryGetValue(receivedMsg.RequestId.Value, out 
var responseHandler);
-            if (status.Code != ResponseStatusCode.NoContent)
-            {
-                responseHandler?.HandleReceived(receivedMsg);
-            }
+            using var httpRequest = new HttpRequestMessage(HttpMethod.Post, 
_uri);
+            httpRequest.Content = content;
+            httpRequest.Headers.Accept.Add(new 
MediaTypeWithQualityHeaderValue(GraphBinaryMimeType));
 
-            if (status.Code == ResponseStatusCode.Success || status.Code == 
ResponseStatusCode.NoContent)
+            if (_enableCompression)
             {
-                if 
(_cancellationByRequestId.TryRemove(receivedMsg.RequestId.Value, out var 
cancellation))
-                {
-                    cancellation.Dispose();
-                }
-                responseHandler?.Finalize(status.Attributes);
-                _callbackByRequestId.TryRemove(receivedMsg.RequestId.Value, 
out _);
+                httpRequest.Headers.AcceptEncoding.Add(new 
StringWithQualityHeaderValue("deflate"));
             }
-        }
-
-        private void Authenticate()
-        {
-            if (string.IsNullOrEmpty(_username) || 
string.IsNullOrEmpty(_password))
-                throw new InvalidOperationException(
-                    $"The Gremlin Server requires authentication, but no 
credentials are specified - username: {_username}, password: {_password}.");
-
-            var message = 
RequestMessage.Build(Tokens.OpsAuthentication).Processor(Tokens.ProcessorTraversal)
-                .AddArgument(Tokens.ArgsSasl, SaslArgument()).Create();
 
-            _writeQueue.Enqueue((message, CancellationToken.None));
-            BeginSendingMessages();
-        }
-
-        private string SaslArgument()
-        {
-            var auth = $"\0{_username}\0{_password}";
-            var authBytes = Encoding.UTF8.GetBytes(auth);
-            return Convert.ToBase64String(authBytes);
-        }
-
-        private void BeginSendingMessages()
-        {
-            if (Interlocked.CompareExchange(ref _writeInProgress, 1, 0) != 0)
-                return;
-            SendMessagesFromQueueAsync().Forget();
-        }
-
-        private async Task SendMessagesFromQueueAsync()
-        {
-            while (_writeQueue.TryDequeue(out var msg))
+            if (_enableUserAgentOnConnect)
             {
-                try
-                {
-                    await SendMessageAsync(msg.msg, 
msg.cancellationToken).ConfigureAwait(false);
-                }
-                catch (OperationCanceledException e) when 
(msg.cancellationToken == e.CancellationToken)
-                {
-                    // Send was cancelled for this message -> silently catch 
as we want to continue sending from this
-                    //  connection. The task responsible for submitting this 
message will be cancelled by the
-                    //  `ResponseHandlerForSingleRequestMessage`.
-                }
-                catch (Exception e)
-                {
-                    await 
CloseConnectionBecauseOfFailureAsync(e).ConfigureAwait(false);
-                    break;
-                }
+                httpRequest.Headers.TryAddWithoutValidation("User-Agent", 
Utils.UserAgent);
             }
-            Interlocked.CompareExchange(ref _writeInProgress, 0, 1);
 
-            // Since the loop ended and the write in progress was set to 0
-            // a new item could have been added, write queue can contain items 
at this time
-            if (!_writeQueue.IsEmpty && Interlocked.CompareExchange(ref 
_writeInProgress, 1, 0) == 0)
+            if (_bulkResults)
             {
-                await SendMessagesFromQueueAsync().ConfigureAwait(false);
+                httpRequest.Headers.Add("bulkResults", "true");
             }
-        }
 
-        private async Task CloseConnectionBecauseOfFailureAsync(Exception 
exception)
-        {
-            EmptyWriteQueue();
-            await CloseAsync().ConfigureAwait(false);
-            NotifyAboutConnectionFailure(exception);
-        }
+            // Future: apply interceptors here
 
-        private void EmptyWriteQueue()
-        {
-            while (_writeQueue.TryDequeue(out _))
-            {
-            }
-        }
+            using var response = await _httpClient.SendAsync(httpRequest, 
cancellationToken)
+                .ConfigureAwait(false);
 
-        private void NotifyAboutConnectionFailure(Exception exception)
-        {
-            foreach (var cb in _callbackByRequestId.Values)
+            // If the HTTP status indicates an error and the response is not 
GraphBinary
+            // (e.g. a proxy 502 or server 404 returning HTML/plain text), 
fail fast with
+            // a clear message instead of letting the deserializer throw a 
confusing parse error.
+            // When the Content-Type matches GraphBinary, fall through to 
normal deserialization
+            // so the status footer in the GB4 response can surface the 
application-level error.
+            if (!response.IsSuccessStatusCode &&
+                response.Content.Headers.ContentType?.MediaType != 
GraphBinaryMimeType)
             {
-                cb.HandleFailure(exception);
+                var errorBody = await 
response.Content.ReadAsStringAsync().ConfigureAwait(false);
+                throw new HttpRequestException(
+                    $"Gremlin Server returned HTTP {(int)response.StatusCode}: 
{errorBody}");
             }
-            _callbackByRequestId.Clear();
-            DisposeCancellationRegistrations();
-        }
 
-        private async Task SendMessageAsync(RequestMessage message, 
CancellationToken cancellationToken)
-        {
-            if (_sessionEnabled)
-            {
-                message = RebuildSessionMessage(message);
-            }
+            var responseBytes = await 
ReadResponseBytesAsync(response).ConfigureAwait(false);
 
-            var serializedMsg = await 
_messageSerializer.SerializeMessageAsync(message, cancellationToken)
+            var responseMessage = await 
_serializer.DeserializeMessageAsync(responseBytes, cancellationToken)
                 .ConfigureAwait(false);
-#if NET6_0_OR_GREATER
-            if (message.Processor == Tokens.OpsAuthentication)
-            {
-                // Don't compress a message that contains credentials to 
prevent attacks like CRIME or BREACH
-                await 
_webSocketConnection.SendMessageUncompressedAsync(serializedMsg, 
cancellationToken).ConfigureAwait(false);
-                return;
-            }
-#endif
-            await _webSocketConnection.SendMessageAsync(serializedMsg, 
cancellationToken).ConfigureAwait(false);
+
+            return BuildResultSet<T>(responseMessage);
         }
 
-        private RequestMessage RebuildSessionMessage(RequestMessage message)
+        private static async Task<byte[]> 
ReadResponseBytesAsync(HttpResponseMessage response)
         {
-            if (message.Processor == Tokens.OpsAuthentication)
+            using var stream = await 
response.Content.ReadAsStreamAsync().ConfigureAwait(false);
+            if (response.Content.Headers.ContentEncoding.Contains("deflate"))
             {
-                return message;
+                using var deflateStream = new DeflateStream(stream, 
CompressionMode.Decompress);
+                using var ms = new MemoryStream();
+                await deflateStream.CopyToAsync(ms).ConfigureAwait(false);
+                return ms.ToArray();
             }
-
-            var msgBuilder = RequestMessage.Build(message.Operation)
-              
.OverrideRequestId(message.RequestId).Processor(Tokens.ProcessorSession);
-            foreach(var kv in message.Arguments)
-            {
-                msgBuilder.AddArgument(kv.Key, kv.Value);
-            }
-            msgBuilder.AddArgument(Tokens.ArgsSession, _sessionId!);
-            return msgBuilder.Create();
+            using var memoryStream = new MemoryStream();
+            await stream.CopyToAsync(memoryStream).ConfigureAwait(false);
+            return memoryStream.ToArray();
         }
 
-        public async Task CloseAsync()
+        private static ResultSet<T> 
BuildResultSet<T>(ResponseMessage<List<object>> responseMessage)

Review Comment:
   Whats the point of this? You are just making a copy of one list to another?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to