This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1e1c148 Fixing minor stuff
1e1c148 is described below
commit 1e1c148fc0f3304dafbddfa89fc001bd6b8caeee
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 10 16:59:52 2021 +0100
Fixing minor stuff
---
samples/Consuming/Program.cs | 2 +-
samples/Producing/Program.cs | 2 +-
samples/Reading/Program.cs | 2 +-
src/DotPulsar/Internal/ChunkingPipeline.cs | 4 ++++
src/DotPulsar/Internal/Connection.cs | 15 ++++-----------
src/DotPulsar/Internal/ConnectionPool.cs | 2 +-
src/DotPulsar/Internal/Connector.cs | 2 +-
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 2 +-
src/DotPulsar/Internal/DotPulsarEventSource.cs | 2 ++
src/DotPulsar/Internal/NotReadyChannel.cs | 3 ---
src/DotPulsar/Internal/PingPongHandler.cs | 9 ++++-----
src/DotPulsar/Internal/PulsarStream.cs | 16 ++++++++++------
src/DotPulsar/MessageId.cs | 2 +-
src/DotPulsar/MessageMetadata.cs | 2 +-
tests/DotPulsar.StressTests/ConsumerTests.cs | 4 ++--
tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs | 1 -
tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs | 6 +++---
tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs | 3 ++-
18 files changed, 39 insertions(+), 40 deletions(-)
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 80bde82..1570efe 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -25,7 +25,7 @@ namespace Consuming
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index ceb74a5..a69bc38 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -24,7 +24,7 @@ namespace Producing
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b867e4a..8a2f0fe 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -25,7 +25,7 @@ namespace Reading
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs
b/src/DotPulsar/Internal/ChunkingPipeline.cs
index 74ecf31..2768f69 100644
--- a/src/DotPulsar/Internal/ChunkingPipeline.cs
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -86,7 +86,11 @@ namespace DotPulsar.Internal
{
if (_bufferCount != 0)
{
+#if NETSTANDARD2_0
await _stream.WriteAsync(_buffer, 0,
_bufferCount).ConfigureAwait(false);
+#else
+ await _stream.WriteAsync(_buffer.AsMemory(0,
_bufferCount)).ConfigureAwait(false);
+#endif
_bufferCount = 0;
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 7829173..02280dd 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@ namespace DotPulsar.Internal
using Exceptions;
using Extensions;
using PulsarApi;
- using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
@@ -172,19 +171,13 @@ namespace DotPulsar.Internal
{
ThrowIfDisposed();
- if (command.Command is null)
- throw new ArgumentNullException(nameof(command.Command));
-
- if (command.Metadata is null)
- throw new ArgumentNullException(nameof(command.Metadata));
-
Task<BaseCommand>? response;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.Command.AsBaseCommand();
+ var baseCommand = command.Command!.AsBaseCommand();
response = _requestResponseHandler.Outgoing(baseCommand);
- var sequence = Serializer.Serialize(baseCommand,
command.Metadata, command.Payload);
+ var sequence = Serializer.Serialize(baseCommand,
command.Metadata!, command.Payload);
await _stream.Send(sequence).ConfigureAwait(false);
}
@@ -218,7 +211,7 @@ namespace DotPulsar.Internal
}
}
- public async Task ProcessIncommingFrames(CancellationToken
cancellationToken)
+ public async Task ProcessIncommingFrames()
{
await Task.Yield();
@@ -247,7 +240,7 @@ namespace DotPulsar.Internal
_channelManager.Incoming(command.CloseProducer);
break;
case BaseCommand.Type.Ping:
- _pingPongHandler.Incoming(command.Ping,
cancellationToken);
+ _pingPongHandler.GotPing();
break;
default:
_requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs
b/src/DotPulsar/Internal/ConnectionPool.cs
index c394cfc..f6eeb60 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -154,7 +154,7 @@ namespace DotPulsar.Internal
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
_connections[url] = connection;
- _ =
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t =>
DisposeConnection(url));
+ _ = connection.ProcessIncommingFrames().ContinueWith(t =>
DisposeConnection(url));
var commandConnect = _commandConnect;
if (url.ProxyThroughServiceUrl)
diff --git a/src/DotPulsar/Internal/Connector.cs
b/src/DotPulsar/Internal/Connector.cs
index 9fc42c3..52b2774 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -60,7 +60,7 @@ namespace DotPulsar.Internal
return stream;
}
- private async Task<Stream> GetStream(string host, int port)
+ private static async Task<Stream> GetStream(string host, int port)
{
var tcpClient = new TcpClient();
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index b4f26d8..a783827 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -39,7 +39,7 @@ namespace DotPulsar.Internal
exceptionContext.ExceptionHandled = true;
}
- private FaultAction DetermineFaultAction(Exception exception,
CancellationToken cancellationToken)
+ private static FaultAction DetermineFaultAction(Exception exception,
CancellationToken cancellationToken)
=> exception switch
{
TooManyRequestsException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs
b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 7f301f6..e31f5a3 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -46,6 +46,7 @@ namespace DotPulsar.Internal
public sealed class DotPulsarEventSource : EventSource
{
+#pragma warning disable IDE0052 // Remove unread private members
private PollingCounter? _totalClientsCounter;
private long _totalClients;
@@ -75,6 +76,7 @@ namespace DotPulsar.Internal
private PollingCounter? _currentReadersCounter;
private long _currentReaders;
+#pragma warning restore IDE0052 // Remove unread private members
public static readonly DotPulsarEventSource Log = new
DotPulsarEventSource();
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs
b/src/DotPulsar/Internal/NotReadyChannel.cs
index 5590138..e463ff6 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -33,9 +33,6 @@ namespace DotPulsar.Internal
public ValueTask<Message> Receive(CancellationToken cancellationToken
= default)
=> throw GetException();
- public Task<CommandSendReceipt> Send(ulong sequenceId,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- => throw GetException();
-
public Task<CommandSendReceipt> Send(MessageMetadata metadata,
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
=> throw GetException();
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs
b/src/DotPulsar/Internal/PingPongHandler.cs
index 9e39cc9..da406aa 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -16,7 +16,6 @@ namespace DotPulsar.Internal
{
using Abstractions;
using PulsarApi;
- using System.Threading;
using System.Threading.Tasks;
public sealed class PingPongHandler
@@ -30,14 +29,14 @@ namespace DotPulsar.Internal
_pong = new CommandPong();
}
- public void Incoming(CommandPing ping, CancellationToken
cancellationToken)
- => Task.Factory.StartNew(() => SendPong(cancellationToken));
+ public void GotPing()
+ => Task.Factory.StartNew(() => SendPong());
- private async Task SendPong(CancellationToken cancellationToken)
+ private async Task SendPong()
{
try
{
- await _connection.Send(_pong,
cancellationToken).ConfigureAwait(false);
+ await _connection.Send(_pong, default).ConfigureAwait(false);
}
catch { }
}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs
b/src/DotPulsar/Internal/PulsarStream.cs
index 867f50f..2fc5eb0 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -54,19 +54,23 @@ namespace DotPulsar.Internal
await _pipeline.Send(sequence).ConfigureAwait(false);
}
-#pragma warning disable CS1998 // Async method lacks 'await' operators and
will run synchronously
+#if NETSTANDARD2_0
+ public ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+ _stream.Dispose();
+
+ return new ValueTask();
+ }
+#else
public async ValueTask DisposeAsync()
-#pragma warning restore CS1998 // Async method lacks 'await' operators and
will run synchronously
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
-#if NETSTANDARD2_0
- _stream.Dispose();
-#else
await _stream.DisposeAsync().ConfigureAwait(false);
-#endif
}
+#endif
private async Task FillPipe(CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index b50c2dc..d8d5aef 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -102,7 +102,7 @@ namespace DotPulsar
=> x is not null ? x.CompareTo(y) >= 0 : y is null;
public static bool operator <=(MessageId x, MessageId y)
- => x is not null ? x.CompareTo(y) <= 0 : true;
+ => x is null || x.CompareTo(y) <= 0;
public override bool Equals(object? o)
=> o is MessageId id && Equals(id);
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index fb7f632..23e0c7b 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -26,7 +26,7 @@ namespace DotPulsar
public MessageMetadata()
=> Metadata = new Internal.PulsarApi.MessageMetadata();
- internal readonly Internal.PulsarApi.MessageMetadata Metadata;
+ internal Internal.PulsarApi.MessageMetadata Metadata { get; }
/// <summary>
/// The delivery time of the message as unix time in milliseconds.
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs
b/tests/DotPulsar.StressTests/ConsumerTests.cs
index fa32611..2bc3421 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -70,7 +70,7 @@ namespace DotPulsar.StressTests
consumed.Should().BeEquivalentTo(produced);
}
- private async Task<IEnumerable<MessageId>> ProduceMessages(IProducer
producer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
@@ -83,7 +83,7 @@ namespace DotPulsar.StressTests
return messageIds;
}
- private async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer
consumer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>>
ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 7a7d23a..d460de3 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -13,7 +13,6 @@
*/
#pragma warning disable 8601
-#pragma warning disable 8625
namespace DotPulsar.StressTests
{
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 8263594..332a94e 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -110,13 +110,13 @@ namespace DotPulsar.Tests.Internal
CancellationTokenSource source1 = new CancellationTokenSource(),
source2 = new CancellationTokenSource();
const int excepted = 1;
var queue = new AsyncQueue<int>();
- var task1 = queue.Dequeue(source1.Token);
- var task2 = queue.Dequeue(source2.Token);
+ var task1 = queue.Dequeue(source1.Token).AsTask();
+ var task2 = queue.Dequeue(source2.Token).AsTask();
//Act
source1.Cancel();
queue.Enqueue(excepted);
- var exception = await Record.ExceptionAsync(() =>
task1.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
+ var exception = await Record.ExceptionAsync(() =>
task1).ConfigureAwait(false);
await task2.ConfigureAwait(false);
//Assert
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
index de3895b..65ab5f5 100644
--- a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -91,7 +91,8 @@ namespace DotPulsar.Tests.Internal
private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
{
var numberOfSegments = 0;
- foreach (var segment in sequence)
+ var enumerator = sequence.GetEnumerator();
+ while (enumerator.MoveNext())
++numberOfSegments;
return numberOfSegments;
}