NightOwl888 commented on code in PR #1170:
URL: https://github.com/apache/lucenenet/pull/1170#discussion_r2306092350


##########
src/Lucene.Net/Support/IO/StreamExtensions.cs:
##########
@@ -210,5 +213,154 @@ public static long ReadInt64(this Stream stream)
                              buff[6] << 16 | buff[7] << 24);
             return (long)((ulong)hi) << 32 | lo;
         }
+
+        //async versions of the above methods
+        public static async Task WriteInt32Async(this Stream output, int 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+
+            byte[] buff = new byte[4];
+            buff[0] = (byte)(value >> 24);
+            buff[1] = (byte)(value >> 16);
+            buff[2] = (byte)(value >> 8);
+            buff[3] = (byte)value;
+
+            await output.WriteAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task WriteInt64Async(this Stream output, long 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+
+            byte[] buff = new byte[8];
+            buff[0] = (byte)(value >> 56);
+            buff[1] = (byte)(value >> 48);
+            buff[2] = (byte)(value >> 40);
+            buff[3] = (byte)(value >> 32);
+            buff[4] = (byte)(value >> 24);
+            buff[5] = (byte)(value >> 16);
+            buff[6] = (byte)(value >> 8);
+            buff[7] = (byte)value;
+
+            await output.WriteAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task WriteUTFAsync(this Stream output, string 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+            if (value is null)
+                throw new ArgumentNullException(nameof(value));
+
+            long utfCount = CountUTFBytes(value);
+            if (utfCount > ushort.MaxValue)
+                throw new EncoderFallbackException("Encoded string too long.");
+
+            byte[] buffer = new byte[(int)utfCount + 2];
+            int offset = 0;
+            offset = WriteInt16ToBuffer((int)utfCount, buffer, offset);
+            offset = WriteUTFBytesToBuffer(value, (int)utfCount, buffer, 
offset);
+
+            await output.WriteAsync(buffer, 0, offset, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task<int> ReadInt32Async(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] buff = new byte[4];
+            int read = await input.ReadAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (read < buff.Length) throw new EndOfStreamException();
+
+            return (buff[0] << 24) | (buff[1] << 16) | (buff[2] << 8) | 
buff[3];
+        }
+
+        public static async Task<long> ReadInt64Async(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] buff = new byte[8];
+            int read = await input.ReadAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (read < buff.Length) throw new EndOfStreamException();
+
+            return ((long)buff[0] << 56) |
+                   ((long)buff[1] << 48) |
+                   ((long)buff[2] << 40) |
+                   ((long)buff[3] << 32) |
+                   ((long)buff[4] << 24) |
+                   ((long)buff[5] << 16) |
+                   ((long)buff[6] << 8) |
+                   buff[7];
+        }
+
+        public static async Task<string> ReadUTFAsync(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] lenBuff = new byte[2];
+            int readLen = await input.ReadAsync(lenBuff, 0, lenBuff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (readLen < lenBuff.Length) throw new EndOfStreamException();

Review Comment:
   According to the docs at: 
https://learn.microsoft.com/en-us/dotnet/api/system.io.stream.readasync?view=net-9.0
   
   > ### Returns
   > 
[Task](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task-1?view=net-9.0)<[Int32](https://learn.microsoft.com/en-us/dotnet/api/system.int32?view=net-9.0)>
   A task that represents the asynchronous read operation. The value of the 
TResult parameter contains the total number of bytes read into the buffer. The 
result value can be less than the number of bytes requested if the number of 
bytes currently available is less than the requested number, or it can be 0 
(zero) if count is 0 or if the end of the stream has been reached.
   
   So, technically we have reached the end of stream in both of these cases:
   
   1. result < length
   2. result == 0
   
   > Note: This is irrelevant if we use the `System.IO.Pipelines` approach.



##########
src/Lucene.Net.Replicator/Http/ReplicationService.cs:
##########
@@ -118,75 +121,128 @@ private static string 
ExtractRequestParam(IReplicationRequest request, string pa
             return param;
         }
 
-        // LUCENENET specific - copy method not used
-
-        /// <summary>
-        /// Executes the replication task.
-        /// </summary>
-        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
-        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        // method to avoid code duplication in sync and async Perform methods
+        private async Task ExecuteReplicationAsync(
+            IReplicationRequest request,
+            IReplicationResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
         {
             string[] pathElements = GetPathElements(request);
             if (pathElements.Length != 2)
-            {
                 throw ServletException.Create("invalid path, must contain 
shard ID and action, e.g. */s1/update");
-            }
 
             if (!Enum.TryParse(pathElements[ACTION_IDX], true, out 
ReplicationAction action))
-            {
                 throw ServletException.Create("Unsupported action provided: " 
+ pathElements[ACTION_IDX]);
-            }
 
             if (!replicators.TryGetValue(pathElements[SHARD_IDX], out 
IReplicator replicator))
-            {
                 throw ServletException.Create("unrecognized shard ID " + 
pathElements[SHARD_IDX]);
-            }
 
-            // SOLR-8933 Don't close this stream.
             try
             {
                 switch (action)
                 {
                     case ReplicationAction.OBTAIN:
-                        string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
-                        string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
-                        string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
-                        using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
-                            stream.CopyTo(response.Body);
-                        break;
+                        {
+                            string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
+                            string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
+                            string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
 
-                    case ReplicationAction.RELEASE:
-                        replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
-                        break;
+                            using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
+                                await copyStreamFunc(stream);
+                            break;
+                        }
 
-                    case ReplicationAction.UPDATE:
-                        string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
-                        SessionToken token = 
replicator.CheckForUpdate(currentVersion);
-                        if (token is null)
+                    case ReplicationAction.RELEASE:
                         {
-                            response.Body.Write(new byte[] { 0 }, 0, 1); // 
marker for null token
+                            replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
+                            break;
                         }
-                        else
+
+                    case ReplicationAction.UPDATE:
                         {
-                            response.Body.Write(new byte[] { 1 }, 0, 1);
-                            token.Serialize(new 
DataOutputStream(response.Body));
+                            string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
+                            SessionToken token = 
replicator.CheckForUpdate(currentVersion);
+                            await writeTokenFunc(token);
+                            break;
                         }
-                        break;
 
-                    // LUCENENET specific:
                     default:
                         if (Debugging.AssertsEnabled) Debugging.Assert(false, 
"Invalid ReplicationAction specified");
                         break;
                 }
             }
             catch (Exception)
             {
-                response.StatusCode = (int)HttpStatusCode.InternalServerError; 
// propagate the failure
+                response.StatusCode = (int)HttpStatusCode.InternalServerError;
             }
             finally
             {
-                response.Flush();
+                await flushFunc();
             }
         }
+
+        // LUCENENET specific - copy method not used
+
+        /// <summary>
+        /// Executes the replication task.
+        /// </summary>
+        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
+        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        {
+            ExecuteReplicationAsync(
+                request,
+                response,
+                stream => { stream.CopyTo(response.Body); return 
Task.CompletedTask; },
+                token =>
+                {
+                    if (token == null)
+                    {
+                        response.Body.Write(new byte[] { 0 }, 0, 1);
+                    }
+                    else
+                    {
+                        response.Body.Write(new byte[] { 1 }, 0, 1);
+                        token.Serialize(new DataOutputStream(response.Body));
+                    }
+                    return Task.CompletedTask;
+                },
+                () => { response.Flush(); return Task.CompletedTask; }
+            ).GetAwaiter().GetResult(); // // keep sync behavior
+        }
+
+
+        /// <summary>
+        /// Executes the replication task asynchronously.
+        /// </summary>
+        /// <param name="request">The replication request containing action 
and parameters.</param>
+        /// <param name="response">The replication response used to send data 
back to the client.</param>
+        /// <param name="cancellationToken">A <see cref="CancellationToken"/> 
to observe while performing the replication.</param>
+        /// <exception cref="InvalidOperationException">Thrown when required 
parameters are missing or invalid.</exception>
+        public virtual Task PerformAsync(
+            IReplicationRequest request,
+            IReplicationResponse response,
+            CancellationToken cancellationToken = default)
+        {
+            return ExecuteReplicationAsync(
+                request,
+                response,
+                stream => stream.CopyToAsync(response.Body, 81920, 
cancellationToken),

Review Comment:
   I suspect you meant 8192 instead of 81920 here. Please fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@lucenenet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to