NightOwl888 commented on code in PR #1170:
URL: https://github.com/apache/lucenenet/pull/1170#discussion_r2314476075


##########
src/Lucene.Net.Replicator/Http/ReplicationService.cs:
##########
@@ -118,75 +121,151 @@ private static string 
ExtractRequestParam(IReplicationRequest request, string pa
             return param;
         }
 
-        // LUCENENET specific - copy method not used
-
-        /// <summary>
-        /// Executes the replication task.
-        /// </summary>
-        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
-        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        // Shared internal logic (generic on response)
+        private async Task ExecuteReplicationInternal<TResponse>(
+            IReplicationRequest request,
+            TResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
+            where TResponse : IBaseReplicationResponse
         {
             string[] pathElements = GetPathElements(request);
             if (pathElements.Length != 2)
-            {
                 throw ServletException.Create("invalid path, must contain 
shard ID and action, e.g. */s1/update");
-            }
 
             if (!Enum.TryParse(pathElements[ACTION_IDX], true, out 
ReplicationAction action))
-            {
                 throw ServletException.Create("Unsupported action provided: " 
+ pathElements[ACTION_IDX]);
-            }
 
             if (!replicators.TryGetValue(pathElements[SHARD_IDX], out 
IReplicator replicator))
-            {
                 throw ServletException.Create("unrecognized shard ID " + 
pathElements[SHARD_IDX]);
-            }
 
-            // SOLR-8933 Don't close this stream.

Review Comment:
   Please restore this original Lucene comment.



##########
src/Lucene.Net.Replicator/Support/Http/Abstractions/IReplicationService.cs:
##########
@@ -1,4 +1,7 @@
 using System;
+using System.Threading;

Review Comment:
   We no longer need these usings here. Please remove them.



##########
src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs:
##########
@@ -1,15 +1,12 @@
+#if FEATURE_ASPNETCORE_TESTHOST
 using Lucene.Net.Replicator.AspNetCore;
 using Lucene.Net.Replicator.Http.Abstractions;
 using Microsoft.AspNetCore.Builder;
-using Microsoft.AspNetCore.Http.Features;
-using System;
-using System.Threading.Tasks;
-
-#if FEATURE_ASPNETCORE_ENDPOINT_CONFIG
+using Microsoft.AspNetCore.Routing;

Review Comment:
   Please put this after `using Microsoft.AspNetCore.Http;`.



##########
src/Lucene.Net.Replicator/Http/ReplicationService.cs:
##########
@@ -118,75 +121,151 @@ private static string 
ExtractRequestParam(IReplicationRequest request, string pa
             return param;
         }
 
-        // LUCENENET specific - copy method not used
-
-        /// <summary>
-        /// Executes the replication task.
-        /// </summary>
-        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
-        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        // Shared internal logic (generic on response)
+        private async Task ExecuteReplicationInternal<TResponse>(
+            IReplicationRequest request,
+            TResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
+            where TResponse : IBaseReplicationResponse
         {
             string[] pathElements = GetPathElements(request);
             if (pathElements.Length != 2)
-            {
                 throw ServletException.Create("invalid path, must contain 
shard ID and action, e.g. */s1/update");
-            }
 
             if (!Enum.TryParse(pathElements[ACTION_IDX], true, out 
ReplicationAction action))
-            {
                 throw ServletException.Create("Unsupported action provided: " 
+ pathElements[ACTION_IDX]);
-            }
 
             if (!replicators.TryGetValue(pathElements[SHARD_IDX], out 
IReplicator replicator))
-            {
                 throw ServletException.Create("unrecognized shard ID " + 
pathElements[SHARD_IDX]);
-            }
 
-            // SOLR-8933 Don't close this stream.
             try
             {
                 switch (action)
                 {
                     case ReplicationAction.OBTAIN:
-                        string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
-                        string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
-                        string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
-                        using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
-                            stream.CopyTo(response.Body);
-                        break;
+                        {
+                            string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
+                            string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
+                            string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
 
-                    case ReplicationAction.RELEASE:
-                        replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
-                        break;
+                            using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
+                                await copyStreamFunc(stream);
+                            break;
+                        }
 
-                    case ReplicationAction.UPDATE:
-                        string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
-                        SessionToken token = 
replicator.CheckForUpdate(currentVersion);
-                        if (token is null)
+                    case ReplicationAction.RELEASE:
                         {
-                            response.Body.Write(new byte[] { 0 }, 0, 1); // 
marker for null token
+                            replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
+                            break;
                         }
-                        else
+
+                    case ReplicationAction.UPDATE:
                         {
-                            response.Body.Write(new byte[] { 1 }, 0, 1);
-                            token.Serialize(new 
DataOutputStream(response.Body));
+                            string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
+                            SessionToken token = 
replicator.CheckForUpdate(currentVersion);
+                            await writeTokenFunc(token);
+                            break;
                         }
-                        break;
 
-                    // LUCENENET specific:
                     default:
                         if (Debugging.AssertsEnabled) Debugging.Assert(false, 
"Invalid ReplicationAction specified");
                         break;
                 }
             }
             catch (Exception)
             {
-                response.StatusCode = (int)HttpStatusCode.InternalServerError; 
// propagate the failure
+                response.StatusCode = (int)HttpStatusCode.InternalServerError;
             }
             finally
             {
-                response.Flush();
+                await flushFunc();
             }
         }
+
+        // For sync
+        private async Task ExecuteReplicationAsync(
+            IReplicationRequest request,
+            IReplicationResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
+        {
+            await ExecuteReplicationInternal(request, response, 
copyStreamFunc, writeTokenFunc, flushFunc);
+        }
+
+        // For async
+        private async Task ExecuteReplicationAsync(
+            IReplicationRequest request,
+            IAsyncReplicationResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
+        {
+            await ExecuteReplicationInternal(request, response, 
copyStreamFunc, writeTokenFunc, flushFunc);
+        }
+
+        // LUCENENET specific - copy method not used
+
+        /// <summary>
+        /// Executes the replication task.
+        /// </summary>
+        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
+        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        {
+            ExecuteReplicationAsync(
+                request,
+                response,
+                stream => { stream.CopyTo(response.Body); return 
Task.CompletedTask; },
+                token =>
+                {
+                    if (token == null)
+                    {
+                        response.Body.Write(new byte[] { 0 }, 0, 1);
+                    }
+                    else
+                    {
+                        response.Body.Write(new byte[] { 1 }, 0, 1);
+                        token.Serialize(new DataOutputStream(response.Body));
+                    }
+                    return Task.CompletedTask;
+                },
+                () => { response.Flush(); return Task.CompletedTask; }
+            ).GetAwaiter().GetResult(); // // keep sync behavior

Review Comment:
   Please change this to:
   
   ```c#
   ).ConfigureAwait(false).GetAwaiter().GetResult(); // keep sync behavior
   ```
   This is safer because it ensures continuations don’t try to capture the 
current context and will not cause deadlocks. 



##########
src/Lucene.Net.Replicator/Support/Http/Abstractions/IReplicationResponse.cs:
##########
@@ -1,4 +1,6 @@
 using System.IO;
+using System.Threading;

Review Comment:
   We no longer need these usings here. Please remove them.



##########
src/Lucene.Net.Replicator/Http/ReplicationService.cs:
##########
@@ -118,75 +121,151 @@ private static string 
ExtractRequestParam(IReplicationRequest request, string pa
             return param;
         }
 
-        // LUCENENET specific - copy method not used
-
-        /// <summary>
-        /// Executes the replication task.
-        /// </summary>
-        /// <exception cref="InvalidOperationException">required parameters 
are missing</exception>
-        public virtual void Perform(IReplicationRequest request, 
IReplicationResponse response)
+        // Shared internal logic (generic on response)
+        private async Task ExecuteReplicationInternal<TResponse>(
+            IReplicationRequest request,
+            TResponse response,
+            Func<Stream, Task> copyStreamFunc,
+            Func<SessionToken, Task> writeTokenFunc,
+            Func<Task> flushFunc)
+            where TResponse : IBaseReplicationResponse
         {
             string[] pathElements = GetPathElements(request);
             if (pathElements.Length != 2)
-            {
                 throw ServletException.Create("invalid path, must contain 
shard ID and action, e.g. */s1/update");
-            }
 
             if (!Enum.TryParse(pathElements[ACTION_IDX], true, out 
ReplicationAction action))
-            {
                 throw ServletException.Create("Unsupported action provided: " 
+ pathElements[ACTION_IDX]);
-            }
 
             if (!replicators.TryGetValue(pathElements[SHARD_IDX], out 
IReplicator replicator))
-            {
                 throw ServletException.Create("unrecognized shard ID " + 
pathElements[SHARD_IDX]);
-            }
 
-            // SOLR-8933 Don't close this stream.
             try
             {
                 switch (action)
                 {
                     case ReplicationAction.OBTAIN:
-                        string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
-                        string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
-                        string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
-                        using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
-                            stream.CopyTo(response.Body);
-                        break;
+                        {
+                            string sessionId = ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM);
+                            string fileName = ExtractRequestParam(request, 
REPLICATE_FILENAME_PARAM);
+                            string source = ExtractRequestParam(request, 
REPLICATE_SOURCE_PARAM);
 
-                    case ReplicationAction.RELEASE:
-                        replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
-                        break;
+                            using (Stream stream = 
replicator.ObtainFile(sessionId, source, fileName))
+                                await copyStreamFunc(stream);
+                            break;
+                        }
 
-                    case ReplicationAction.UPDATE:
-                        string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
-                        SessionToken token = 
replicator.CheckForUpdate(currentVersion);
-                        if (token is null)
+                    case ReplicationAction.RELEASE:
                         {
-                            response.Body.Write(new byte[] { 0 }, 0, 1); // 
marker for null token
+                            replicator.Release(ExtractRequestParam(request, 
REPLICATE_SESSION_ID_PARAM));
+                            break;
                         }
-                        else
+
+                    case ReplicationAction.UPDATE:
                         {
-                            response.Body.Write(new byte[] { 1 }, 0, 1);
-                            token.Serialize(new 
DataOutputStream(response.Body));
+                            string currentVersion = 
request.QueryParam(REPLICATE_VERSION_PARAM);
+                            SessionToken token = 
replicator.CheckForUpdate(currentVersion);
+                            await writeTokenFunc(token);
+                            break;
                         }
-                        break;
 
-                    // LUCENENET specific:
                     default:
                         if (Debugging.AssertsEnabled) Debugging.Assert(false, 
"Invalid ReplicationAction specified");
                         break;
                 }
             }
             catch (Exception)
             {
-                response.StatusCode = (int)HttpStatusCode.InternalServerError; 
// propagate the failure

Review Comment:
   Please restore this original Lucene comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@lucenenet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to