NightOwl888 commented on code in PR #1170:
URL: https://github.com/apache/lucenenet/pull/1170#discussion_r2308954835


##########
src/Lucene.Net/Support/IO/StreamExtensions.cs:
##########
@@ -210,5 +213,154 @@ public static long ReadInt64(this Stream stream)
                              buff[6] << 16 | buff[7] << 24);
             return (long)((ulong)hi) << 32 | lo;
         }
+
+        //async versions of the above methods
+        public static async Task WriteInt32Async(this Stream output, int 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+
+            byte[] buff = new byte[4];
+            buff[0] = (byte)(value >> 24);
+            buff[1] = (byte)(value >> 16);
+            buff[2] = (byte)(value >> 8);
+            buff[3] = (byte)value;
+
+            await output.WriteAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task WriteInt64Async(this Stream output, long 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+
+            byte[] buff = new byte[8];
+            buff[0] = (byte)(value >> 56);
+            buff[1] = (byte)(value >> 48);
+            buff[2] = (byte)(value >> 40);
+            buff[3] = (byte)(value >> 32);
+            buff[4] = (byte)(value >> 24);
+            buff[5] = (byte)(value >> 16);
+            buff[6] = (byte)(value >> 8);
+            buff[7] = (byte)value;
+
+            await output.WriteAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task WriteUTFAsync(this Stream output, string 
value, CancellationToken cancellationToken = default)
+        {
+            if (output is null)
+                throw new ArgumentNullException(nameof(output));
+            if (value is null)
+                throw new ArgumentNullException(nameof(value));
+
+            long utfCount = CountUTFBytes(value);
+            if (utfCount > ushort.MaxValue)
+                throw new EncoderFallbackException("Encoded string too long.");
+
+            byte[] buffer = new byte[(int)utfCount + 2];
+            int offset = 0;
+            offset = WriteInt16ToBuffer((int)utfCount, buffer, offset);
+            offset = WriteUTFBytesToBuffer(value, (int)utfCount, buffer, 
offset);
+
+            await output.WriteAsync(buffer, 0, offset, 
cancellationToken).ConfigureAwait(false);
+        }
+
+        public static async Task<int> ReadInt32Async(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] buff = new byte[4];
+            int read = await input.ReadAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (read < buff.Length) throw new EndOfStreamException();
+
+            return (buff[0] << 24) | (buff[1] << 16) | (buff[2] << 8) | 
buff[3];
+        }
+
+        public static async Task<long> ReadInt64Async(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] buff = new byte[8];
+            int read = await input.ReadAsync(buff, 0, buff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (read < buff.Length) throw new EndOfStreamException();
+
+            return ((long)buff[0] << 56) |
+                   ((long)buff[1] << 48) |
+                   ((long)buff[2] << 40) |
+                   ((long)buff[3] << 32) |
+                   ((long)buff[4] << 24) |
+                   ((long)buff[5] << 16) |
+                   ((long)buff[6] << 8) |
+                   buff[7];
+        }
+
+        public static async Task<string> ReadUTFAsync(this Stream input, 
CancellationToken cancellationToken = default)
+        {
+            if (input is null)
+                throw new ArgumentNullException(nameof(input));
+
+            byte[] lenBuff = new byte[2];
+            int readLen = await input.ReadAsync(lenBuff, 0, lenBuff.Length, 
cancellationToken).ConfigureAwait(false);
+            if (readLen < lenBuff.Length) throw new EndOfStreamException();
+
+            int length = (lenBuff[0] << 8) | lenBuff[1];
+            byte[] buffer = new byte[length];

Review Comment:
   @NehanPathan
   
   As I previously mentioned, the async tests are also failing CheckIndex 
(meaning that there is index corruption). 
   
   ```console
     Message: 
       TearDown : Lucene.Net.Util.LuceneSystemException : CheckIndex failed
   
     Standard Output: 
       CheckIndex failed
       ERROR: could not read any segments file in directory
       System.IO.IOException: did not read all bytes from file: read 241 vs 
size 482 (resource: 
BufferedChecksumIndexInput(MockIndexInputWrapper(RAMInputStream(name=_0.si))))
          at Lucene.Net.Codecs.CodecUtil.CheckFooter(ChecksumIndexInput in) in 
F:\Projects\lucenenet\src\Lucene.Net\Codecs\CodecUtil.cs:line 218
          at 
Lucene.Net.Codecs.Lucene46.Lucene46SegmentInfoReader.Read(Directory dir, String 
segment, IOContext context) in 
F:\Projects\lucenenet\src\Lucene.Net\Codecs\Lucene46\Lucene46SegmentInfoReader.cs:line
 64
          at Lucene.Net.Index.SegmentInfos.Read(Directory directory, String 
segmentFileName) in 
F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 534
          at 
Lucene.Net.Index.SegmentInfos.FindSegmentsFileAnonymousClass.DoBody(String 
segmentFileName) in 
F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 640
          at Lucene.Net.Index.SegmentInfos.FindSegmentsFile.Run(IndexCommit 
commit) in F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 1158
       --- End of stack trace from previous location ---
          at Lucene.Net.Index.SegmentInfos.FindSegmentsFile.Run(IndexCommit 
commit) in F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 1136
          at Lucene.Net.Index.SegmentInfos.FindSegmentsFile.Run() in 
F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 965
          at Lucene.Net.Index.SegmentInfos.Read(Directory directory) in 
F:\Projects\lucenenet\src\Lucene.Net\Index\SegmentInfos.cs:line 625
          at Lucene.Net.Index.CheckIndex.DoCheckIndex(IList`1 onlySegments) in 
F:\Projects\lucenenet\src\Lucene.Net\Index\CheckIndex.cs:line 515
   
   ```
   
   There are 2 test failures:
   
   - HttpReplicatorTest.TestBasic
   - HttpReplicatorTest.TestServerErrors
   
   Note that the exact message changes every time the test is run, but it 
always fails `CheckIndex`.
   
   To see the test failures, change the following lines:
   
   - 
https://github.com/apache/lucenenet/blob/0a8d39c2d897792e60f28361d6a206231c16c629/src/Lucene.Net.Tests.Replicator/Http/ReplicationServlet.cs#L93-L94
   
   <details>
       <summary>ReplicationServiceMiddleware - Click to expand</summary>
   
   ```c#
   public class ReplicationServiceMiddleware
   {
       private readonly RequestDelegate next;
       private readonly IReplicationService service;
   
       public ReplicationServiceMiddleware(RequestDelegate next, 
IReplicationService service)
       {
           this.next = next ?? throw new ArgumentNullException(nameof(next));
           this. Service = service ?? throw new 
ArgumentNullException(nameof(service));
       }
   
       public async Task InvokeAsync(HttpContext context)
       {
           // LUCENENET: This is to allow synchronous IO to happen for these 
requests.
           // LUCENENET TODO: Allow async operations from Replicator.
           var syncIoFeature = context.Features.Get<IHttpBodyControlFeature>();
           if (syncIoFeature != null)
           {
               syncIoFeature.AllowSynchronousIO = true;
           }
   
           await service.PerformAsync(context.Request, context.Response, 
context.RequestAborted);
   
           // This is a terminating endpoint. Do not call the next 
delegate/middleware in the pipeline.
       }
   }
   ```
   </details>
   
   Note that `AllowSynchronousIO` is required for the tests to function (even 
with the current changes).
   
   You will also need to add a `PerformAsync()` extension method for the above 
code to compile:
   
   <details>
       <summary>AspNetCoreReplicationServiceExtentions - Click to 
expand</summary>
   
   ```c#
   public static class AspNetCoreReplicationServiceExtentions
   {
       /// <summary>
       /// Extension method that mirrors the signature of <see 
cref="IReplicationService.PerformAsync"/> using AspNetCore as implementation.
       /// </summary>
       public static async Task PerformAsync(this IReplicationService self, 
HttpRequest request, HttpResponse response, CancellationToken cancellationToken 
= default)
       {
           await self.PerformAsync(new AspNetCoreReplicationRequest(request), 
new AspNetCoreReplicationResponse(response), cancellationToken);
       }
   }
   ```
   </details>
   
   The replicator orchestrates the sync operation safely by preventing any 
index writes during the copy operation. I suspect the failures are because the 
replication server currently depends on synchronous I/O to block index updates 
from occurring until the operation is complete. That is just a guess, though. 
There may be a different bug to track down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@lucenenet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to