This is an automated email from the ASF dual-hosted git repository.

nightowl888 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucenenet.git

commit 594d6b6dfebea7cdb2729acfd0ac1faabdbd807b
Author: Shad Storhaug <[email protected]>
AuthorDate: Fri Nov 5 20:44:04 2021 +0700

    Lucene.Net.Replicator.HttpClientBase: Use a stream wrapper to consume and 
dispose the stream when there are no more bytes to read.
---
 Directory.Build.targets                          |   2 +-
 src/Lucene.Net.Replicator/Http/HttpClientBase.cs | 131 +++++++++++++++++++++--
 2 files changed, 124 insertions(+), 9 deletions(-)

diff --git a/Directory.Build.targets b/Directory.Build.targets
index b48f3f8..8e87632 100644
--- a/Directory.Build.targets
+++ b/Directory.Build.targets
@@ -44,7 +44,7 @@
 
     
<DefineConstants>$(DefineConstants);FEATURE_CONDITIONALWEAKTABLE_ENUMERATOR</DefineConstants>
     
<DefineConstants>$(DefineConstants);FEATURE_CONDITIONALWEAKTABLE_ADDORUPDATE</DefineConstants>
-    
+    <DefineConstants>$(DefineConstants);FEATURE_SPAN</DefineConstants>
     
   </PropertyGroup>
 
diff --git a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs 
b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs
index 974f5f4..a85b719 100644
--- a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs
+++ b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs
@@ -1,6 +1,5 @@
 using Lucene.Net.Diagnostics;
 using Lucene.Net.Support;
-using Lucene.Net.Util;
 using Newtonsoft.Json;
 using Newtonsoft.Json.Linq;
 using System;
@@ -11,6 +10,7 @@ using System.Net.Http;
 using System.Runtime.ExceptionServices;
 using System.Text;
 using System.Threading;
+using System.Threading.Tasks;
 
 namespace Lucene.Net.Replicator.Http
 {
@@ -90,7 +90,7 @@ namespace Lucene.Net.Replicator.Http
         /// <param name="messageHandler">Optional, The HTTP handler stack to 
use for sending requests.</param>
         //Note: LUCENENET Specific
         protected HttpClientBase(string url, HttpMessageHandler messageHandler 
= null)
-            : this(url, new HttpClient(messageHandler ?? new 
HttpClientHandler()) {  Timeout = DEFAULT_TIMEOUT })
+            : this(url, new HttpClient(messageHandler ?? new 
HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT })
         {
         }
 
@@ -152,7 +152,14 @@ namespace Lucene.Net.Replicator.Http
         {
             if (!response.IsSuccessStatusCode)
             {
-                ThrowKnownError(response);
+                try
+                {
+                    ThrowKnownError(response);
+                }
+                finally
+                {
+                    ConsumeQuietly(response);
+                }
             }
         }
 
@@ -213,9 +220,10 @@ namespace Lucene.Net.Replicator.Http
         protected virtual HttpResponseMessage ExecutePost(string request, 
object entity, params string[] parameters)
         {
             EnsureOpen();
+
             //.NET Note: No headers? No ContentType?... Bad use of Http?
             HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, 
QueryString(request, parameters));
-            
+
             req.Content = new StringContent(JToken.FromObject(entity, 
JsonSerializer.Create(ReplicationService.JSON_SERIALIZER_SETTINGS))
                 .ToString(Formatting.None), Encoding.UTF8, "application/json");
 
@@ -244,8 +252,8 @@ namespace Lucene.Net.Replicator.Http
 
         private string QueryString(string request, params string[] parameters)
         {
-            return parameters == null 
-                ? string.Format("{0}/{1}", Url, request) 
+            return parameters == null
+                ? string.Format("{0}/{1}", Url, request)
                 : string.Format("{0}/{1}?{2}", Url, request, string
                 .Join("&", 
parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => 
string.Format("{0}={1}", key, val))));
         }
@@ -267,7 +275,10 @@ namespace Lucene.Net.Replicator.Http
         /// <exception cref="IOException"></exception>
         public virtual Stream ResponseInputStream(HttpResponseMessage 
response, bool consume)
         {
-            return 
response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
+            Stream result = 
response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
+            if (consume)
+                result = new ConsumingStream(result);
+            return result;
         }
 
         /// <summary>
@@ -313,7 +324,7 @@ namespace Lucene.Net.Replicator.Http
                     {
                         try
                         {
-                            
response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult()?.Dispose();
+                            ConsumeQuietly(response);
                         }
                         finally
                         {
@@ -349,5 +360,109 @@ namespace Lucene.Net.Replicator.Http
             Dispose(true);
             GC.SuppressFinalize(this);
         }
+
+        private static void ConsumeQuietly(HttpResponseMessage response)
+        {
+            try
+            {
+                response.Content?.Dispose(); // LUCENENET: Force a flush and 
and dispose the underlying stream
+            }
+            catch (Exception ioe) when (ioe.IsIOException())
+            {
+                // Ignore
+            }
+        }
+
+        /// <summary>
+        /// Wraps a stream and consumes (flushes) and disposes automatically
+        /// when the last call to a Read overload occurs.
+        /// </summary>
+        private class ConsumingStream : Stream
+        {
+            private readonly Stream input;
+            private bool consumed = false;
+
+            public ConsumingStream(Stream input)
+            {
+                this.input = input ?? throw new 
ArgumentNullException(nameof(input));
+            }
+
+            public override bool CanRead => input.CanRead;
+
+            public override bool CanSeek => input.CanSeek;
+
+            public override bool CanWrite => input.CanWrite;
+
+            public override long Length => input.Length;
+
+            public override long Position
+            {
+                get => input.Position;
+                set => input.Position = value;
+            }
+
+            public override void Flush() => input.Flush();
+
+            public override int ReadByte()
+            {
+                int res = input.ReadByte();
+                Consume(res);
+                return res;
+            }
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                int res = input.Read(buffer, offset, count);
+                Consume(res);
+                return res;
+            }
+
+#if FEATURE_SPAN
+            public override int Read(Span<byte> buffer)
+            {
+                int res = input.Read(buffer);
+                Consume(res);
+                return res;
+            }
+#endif
+            public override async Task<int> ReadAsync(byte[] buffer, int 
offset, int count, CancellationToken cancellationToken)
+            {
+                int res = await input.ReadAsync(buffer, offset, count, 
cancellationToken);
+                Consume(res);
+                return res;
+            }
+
+            public override int EndRead(IAsyncResult asyncResult)
+            {
+                int res = input.EndRead(asyncResult);
+                Consume(res);
+                return res;
+            }
+            public override long Seek(long offset, SeekOrigin origin) => 
input.Seek(offset, origin);
+            public override void SetLength(long value) => 
input.SetLength(value);
+            public override void Write(byte[] buffer, int offset, int count) 
=> Write(buffer, offset, count);
+
+            private void Consume(int zeroOrMinusOne)
+            {
+                if (!consumed && zeroOrMinusOne <= 0)
+                {
+                    try
+                    {
+                        try
+                        {
+                            input.Flush();
+                        }
+                        finally
+                        {
+                            input.Dispose();
+                        }
+                    }
+                    catch (Exception ioe) when (ioe.IsIOException())
+                    {
+                        // ignored on purpose
+                    }
+                    consumed = true;
+                }
+            }
+        }
     }
 }

Reply via email to