This is an automated email from the ASF dual-hosted git repository. nightowl888 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/lucenenet.git
commit 594d6b6dfebea7cdb2729acfd0ac1faabdbd807b Author: Shad Storhaug <[email protected]> AuthorDate: Fri Nov 5 20:44:04 2021 +0700 Lucene.Net.Replicator.HttpClientBase: Use a stream wrapper to consume and dispose the stream when there are no more bytes to read. --- Directory.Build.targets | 2 +- src/Lucene.Net.Replicator/Http/HttpClientBase.cs | 131 +++++++++++++++++++++-- 2 files changed, 124 insertions(+), 9 deletions(-) diff --git a/Directory.Build.targets b/Directory.Build.targets index b48f3f8..8e87632 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -44,7 +44,7 @@ <DefineConstants>$(DefineConstants);FEATURE_CONDITIONALWEAKTABLE_ENUMERATOR</DefineConstants> <DefineConstants>$(DefineConstants);FEATURE_CONDITIONALWEAKTABLE_ADDORUPDATE</DefineConstants> - + <DefineConstants>$(DefineConstants);FEATURE_SPAN</DefineConstants> </PropertyGroup> diff --git a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs index 974f5f4..a85b719 100644 --- a/src/Lucene.Net.Replicator/Http/HttpClientBase.cs +++ b/src/Lucene.Net.Replicator/Http/HttpClientBase.cs @@ -1,6 +1,5 @@ using Lucene.Net.Diagnostics; using Lucene.Net.Support; -using Lucene.Net.Util; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; @@ -11,6 +10,7 @@ using System.Net.Http; using System.Runtime.ExceptionServices; using System.Text; using System.Threading; +using System.Threading.Tasks; namespace Lucene.Net.Replicator.Http { @@ -90,7 +90,7 @@ namespace Lucene.Net.Replicator.Http /// <param name="messageHandler">Optional, The HTTP handler stack to use for sending requests.</param> //Note: LUCENENET Specific protected HttpClientBase(string url, HttpMessageHandler messageHandler = null) - : this(url, new HttpClient(messageHandler ?? new HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT }) + : this(url, new HttpClient(messageHandler ?? new HttpClientHandler()) { Timeout = DEFAULT_TIMEOUT }) { } @@ -152,7 +152,14 @@ namespace Lucene.Net.Replicator.Http { if (!response.IsSuccessStatusCode) { - ThrowKnownError(response); + try + { + ThrowKnownError(response); + } + finally + { + ConsumeQuietly(response); + } } } @@ -213,9 +220,10 @@ namespace Lucene.Net.Replicator.Http protected virtual HttpResponseMessage ExecutePost(string request, object entity, params string[] parameters) { EnsureOpen(); + //.NET Note: No headers? No ContentType?... Bad use of Http? HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters)); - + req.Content = new StringContent(JToken.FromObject(entity, JsonSerializer.Create(ReplicationService.JSON_SERIALIZER_SETTINGS)) .ToString(Formatting.None), Encoding.UTF8, "application/json"); @@ -244,8 +252,8 @@ namespace Lucene.Net.Replicator.Http private string QueryString(string request, params string[] parameters) { - return parameters == null - ? string.Format("{0}/{1}", Url, request) + return parameters == null + ? string.Format("{0}/{1}", Url, request) : string.Format("{0}/{1}?{2}", Url, request, string .Join("&", parameters.Select(WebUtility.UrlEncode).InPairs((key, val) => string.Format("{0}={1}", key, val)))); } @@ -267,7 +275,10 @@ namespace Lucene.Net.Replicator.Http /// <exception cref="IOException"></exception> public virtual Stream ResponseInputStream(HttpResponseMessage response, bool consume) { - return response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + Stream result = response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + if (consume) + result = new ConsumingStream(result); + return result; } /// <summary> @@ -313,7 +324,7 @@ namespace Lucene.Net.Replicator.Http { try { - response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult()?.Dispose(); + ConsumeQuietly(response); } finally { @@ -349,5 +360,109 @@ namespace Lucene.Net.Replicator.Http Dispose(true); GC.SuppressFinalize(this); } + + private static void ConsumeQuietly(HttpResponseMessage response) + { + try + { + response.Content?.Dispose(); // LUCENENET: Force a flush and and dispose the underlying stream + } + catch (Exception ioe) when (ioe.IsIOException()) + { + // Ignore + } + } + + /// <summary> + /// Wraps a stream and consumes (flushes) and disposes automatically + /// when the last call to a Read overload occurs. + /// </summary> + private class ConsumingStream : Stream + { + private readonly Stream input; + private bool consumed = false; + + public ConsumingStream(Stream input) + { + this.input = input ?? throw new ArgumentNullException(nameof(input)); + } + + public override bool CanRead => input.CanRead; + + public override bool CanSeek => input.CanSeek; + + public override bool CanWrite => input.CanWrite; + + public override long Length => input.Length; + + public override long Position + { + get => input.Position; + set => input.Position = value; + } + + public override void Flush() => input.Flush(); + + public override int ReadByte() + { + int res = input.ReadByte(); + Consume(res); + return res; + } + public override int Read(byte[] buffer, int offset, int count) + { + int res = input.Read(buffer, offset, count); + Consume(res); + return res; + } + +#if FEATURE_SPAN + public override int Read(Span<byte> buffer) + { + int res = input.Read(buffer); + Consume(res); + return res; + } +#endif + public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int res = await input.ReadAsync(buffer, offset, count, cancellationToken); + Consume(res); + return res; + } + + public override int EndRead(IAsyncResult asyncResult) + { + int res = input.EndRead(asyncResult); + Consume(res); + return res; + } + public override long Seek(long offset, SeekOrigin origin) => input.Seek(offset, origin); + public override void SetLength(long value) => input.SetLength(value); + public override void Write(byte[] buffer, int offset, int count) => Write(buffer, offset, count); + + private void Consume(int zeroOrMinusOne) + { + if (!consumed && zeroOrMinusOne <= 0) + { + try + { + try + { + input.Flush(); + } + finally + { + input.Dispose(); + } + } + catch (Exception ioe) when (ioe.IsIOException()) + { + // ignored on purpose + } + consumed = true; + } + } + } } }
