This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e1c148  Fixing minor stuff
1e1c148 is described below

commit 1e1c148fc0f3304dafbddfa89fc001bd6b8caeee
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 10 16:59:52 2021 +0100

    Fixing minor stuff
---
 samples/Consuming/Program.cs                            |  2 +-
 samples/Producing/Program.cs                            |  2 +-
 samples/Reading/Program.cs                              |  2 +-
 src/DotPulsar/Internal/ChunkingPipeline.cs              |  4 ++++
 src/DotPulsar/Internal/Connection.cs                    | 15 ++++-----------
 src/DotPulsar/Internal/ConnectionPool.cs                |  2 +-
 src/DotPulsar/Internal/Connector.cs                     |  2 +-
 src/DotPulsar/Internal/DefaultExceptionHandler.cs       |  2 +-
 src/DotPulsar/Internal/DotPulsarEventSource.cs          |  2 ++
 src/DotPulsar/Internal/NotReadyChannel.cs               |  3 ---
 src/DotPulsar/Internal/PingPongHandler.cs               |  9 ++++-----
 src/DotPulsar/Internal/PulsarStream.cs                  | 16 ++++++++++------
 src/DotPulsar/MessageId.cs                              |  2 +-
 src/DotPulsar/MessageMetadata.cs                        |  2 +-
 tests/DotPulsar.StressTests/ConsumerTests.cs            |  4 ++--
 tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs |  1 -
 tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs       |  6 +++---
 tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs |  3 ++-
 18 files changed, 39 insertions(+), 40 deletions(-)

diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 80bde82..1570efe 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -25,7 +25,7 @@ namespace Consuming
 
     internal static class Program
     {
-        private static async Task Main(string[] args)
+        private static async Task Main()
         {
             const string myTopic = "persistent://public/default/mytopic";
 
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index ceb74a5..a69bc38 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -24,7 +24,7 @@ namespace Producing
 
     internal static class Program
     {
-        private static async Task Main(string[] args)
+        private static async Task Main()
         {
             const string myTopic = "persistent://public/default/mytopic";
 
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b867e4a..8a2f0fe 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -25,7 +25,7 @@ namespace Reading
 
     internal static class Program
     {
-        private static async Task Main(string[] args)
+        private static async Task Main()
         {
             const string myTopic = "persistent://public/default/mytopic";
 
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs 
b/src/DotPulsar/Internal/ChunkingPipeline.cs
index 74ecf31..2768f69 100644
--- a/src/DotPulsar/Internal/ChunkingPipeline.cs
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -86,7 +86,11 @@ namespace DotPulsar.Internal
         {
             if (_bufferCount != 0)
             {
+#if NETSTANDARD2_0
                 await _stream.WriteAsync(_buffer, 0, 
_bufferCount).ConfigureAwait(false);
+#else
+                await _stream.WriteAsync(_buffer.AsMemory(0, 
_bufferCount)).ConfigureAwait(false);
+#endif
                 _bufferCount = 0;
             }
         }
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 7829173..02280dd 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@ namespace DotPulsar.Internal
     using Exceptions;
     using Extensions;
     using PulsarApi;
-    using System;
     using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
@@ -172,19 +171,13 @@ namespace DotPulsar.Internal
         {
             ThrowIfDisposed();
 
-            if (command.Command is null)
-                throw new ArgumentNullException(nameof(command.Command));
-
-            if (command.Metadata is null)
-                throw new ArgumentNullException(nameof(command.Metadata));
-
             Task<BaseCommand>? response;
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.Command.AsBaseCommand();
+                var baseCommand = command.Command!.AsBaseCommand();
                 response = _requestResponseHandler.Outgoing(baseCommand);
-                var sequence = Serializer.Serialize(baseCommand, 
command.Metadata, command.Payload);
+                var sequence = Serializer.Serialize(baseCommand, 
command.Metadata!, command.Payload);
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -218,7 +211,7 @@ namespace DotPulsar.Internal
             }
         }
 
-        public async Task ProcessIncommingFrames(CancellationToken 
cancellationToken)
+        public async Task ProcessIncommingFrames()
         {
             await Task.Yield();
 
@@ -247,7 +240,7 @@ namespace DotPulsar.Internal
                             _channelManager.Incoming(command.CloseProducer);
                             break;
                         case BaseCommand.Type.Ping:
-                            _pingPongHandler.Incoming(command.Ping, 
cancellationToken);
+                            _pingPongHandler.GotPing();
                             break;
                         default:
                             _requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index c394cfc..f6eeb60 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -154,7 +154,7 @@ namespace DotPulsar.Internal
             var connection = new Connection(new PulsarStream(stream));
             DotPulsarEventSource.Log.ConnectionCreated();
             _connections[url] = connection;
-            _ = 
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => 
DisposeConnection(url));
+            _ = connection.ProcessIncommingFrames().ContinueWith(t => 
DisposeConnection(url));
             var commandConnect = _commandConnect;
 
             if (url.ProxyThroughServiceUrl)
diff --git a/src/DotPulsar/Internal/Connector.cs 
b/src/DotPulsar/Internal/Connector.cs
index 9fc42c3..52b2774 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -60,7 +60,7 @@ namespace DotPulsar.Internal
             return stream;
         }
 
-        private async Task<Stream> GetStream(string host, int port)
+        private static async Task<Stream> GetStream(string host, int port)
         {
             var tcpClient = new TcpClient();
 
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index b4f26d8..a783827 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -39,7 +39,7 @@ namespace DotPulsar.Internal
             exceptionContext.ExceptionHandled = true;
         }
 
-        private FaultAction DetermineFaultAction(Exception exception, 
CancellationToken cancellationToken)
+        private static FaultAction DetermineFaultAction(Exception exception, 
CancellationToken cancellationToken)
             => exception switch
             {
                 TooManyRequestsException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs 
b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 7f301f6..e31f5a3 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -46,6 +46,7 @@ namespace DotPulsar.Internal
 
     public sealed class DotPulsarEventSource : EventSource
     {
+#pragma warning disable IDE0052 // Remove unread private members
         private PollingCounter? _totalClientsCounter;
         private long _totalClients;
 
@@ -75,6 +76,7 @@ namespace DotPulsar.Internal
 
         private PollingCounter? _currentReadersCounter;
         private long _currentReaders;
+#pragma warning restore IDE0052 // Remove unread private members
 
         public static readonly DotPulsarEventSource Log = new 
DotPulsarEventSource();
 
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs 
b/src/DotPulsar/Internal/NotReadyChannel.cs
index 5590138..e463ff6 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -33,9 +33,6 @@ namespace DotPulsar.Internal
         public ValueTask<Message> Receive(CancellationToken cancellationToken 
= default)
             => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ulong sequenceId, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
-            => throw GetException();
-
         public Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
             => throw GetException();
 
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index 9e39cc9..da406aa 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -16,7 +16,6 @@ namespace DotPulsar.Internal
 {
     using Abstractions;
     using PulsarApi;
-    using System.Threading;
     using System.Threading.Tasks;
 
     public sealed class PingPongHandler
@@ -30,14 +29,14 @@ namespace DotPulsar.Internal
             _pong = new CommandPong();
         }
 
-        public void Incoming(CommandPing ping, CancellationToken 
cancellationToken)
-            => Task.Factory.StartNew(() => SendPong(cancellationToken));
+        public void GotPing()
+            => Task.Factory.StartNew(() => SendPong());
 
-        private async Task SendPong(CancellationToken cancellationToken)
+        private async Task SendPong()
         {
             try
             {
-                await _connection.Send(_pong, 
cancellationToken).ConfigureAwait(false);
+                await _connection.Send(_pong, default).ConfigureAwait(false);
             }
             catch { }
         }
diff --git a/src/DotPulsar/Internal/PulsarStream.cs 
b/src/DotPulsar/Internal/PulsarStream.cs
index 867f50f..2fc5eb0 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -54,19 +54,23 @@ namespace DotPulsar.Internal
             await _pipeline.Send(sequence).ConfigureAwait(false);
         }
 
-#pragma warning disable CS1998 // Async method lacks 'await' operators and 
will run synchronously
+#if NETSTANDARD2_0
+        public ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+                _stream.Dispose();
+            
+            return new ValueTask();
+        }
+#else
         public async ValueTask DisposeAsync()
-#pragma warning restore CS1998 // Async method lacks 'await' operators and 
will run synchronously
         {
             if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
                 return;
 
-#if NETSTANDARD2_0
-            _stream.Dispose();
-#else
             await _stream.DisposeAsync().ConfigureAwait(false);
-#endif
         }
+#endif
 
         private async Task FillPipe(CancellationToken cancellationToken)
         {
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index b50c2dc..d8d5aef 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -102,7 +102,7 @@ namespace DotPulsar
             => x is not null ? x.CompareTo(y) >= 0 : y is null;
 
         public static bool operator <=(MessageId x, MessageId y)
-            => x is not null ? x.CompareTo(y) <= 0 : true;
+            => x is null || x.CompareTo(y) <= 0;
 
         public override bool Equals(object? o)
             => o is MessageId id && Equals(id);
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index fb7f632..23e0c7b 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -26,7 +26,7 @@ namespace DotPulsar
         public MessageMetadata()
             => Metadata = new Internal.PulsarApi.MessageMetadata();
 
-        internal readonly Internal.PulsarApi.MessageMetadata Metadata;
+        internal Internal.PulsarApi.MessageMetadata Metadata { get; }
 
         /// <summary>
         /// The delivery time of the message as unix time in milliseconds.
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs 
b/tests/DotPulsar.StressTests/ConsumerTests.cs
index fa32611..2bc3421 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -70,7 +70,7 @@ namespace DotPulsar.StressTests
             consumed.Should().BeEquivalentTo(produced);
         }
 
-        private async Task<IEnumerable<MessageId>> ProduceMessages(IProducer 
producer, int numberOfMessages, CancellationToken ct)
+        private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
         {
             var messageIds = new MessageId[numberOfMessages];
 
@@ -83,7 +83,7 @@ namespace DotPulsar.StressTests
             return messageIds;
         }
 
-        private async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer 
consumer, int numberOfMessages, CancellationToken ct)
+        private static async Task<IEnumerable<MessageId>> 
ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
         {
             var messageIds = new List<MessageId>(numberOfMessages);
 
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs 
b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 7a7d23a..d460de3 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -13,7 +13,6 @@
  */
 
 #pragma warning disable 8601
-#pragma warning disable 8625
 
 namespace DotPulsar.StressTests
 {
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs 
b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 8263594..332a94e 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -110,13 +110,13 @@ namespace DotPulsar.Tests.Internal
             CancellationTokenSource source1 = new CancellationTokenSource(), 
source2 = new CancellationTokenSource();
             const int excepted = 1;
             var queue = new AsyncQueue<int>();
-            var task1 = queue.Dequeue(source1.Token);
-            var task2 = queue.Dequeue(source2.Token);
+            var task1 = queue.Dequeue(source1.Token).AsTask();
+            var task2 = queue.Dequeue(source2.Token).AsTask();
 
             //Act
             source1.Cancel();
             queue.Enqueue(excepted);
-            var exception = await Record.ExceptionAsync(() => 
task1.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
+            var exception = await Record.ExceptionAsync(() => 
task1).ConfigureAwait(false);
             await task2.ConfigureAwait(false);
 
             //Assert
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs 
b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
index de3895b..65ab5f5 100644
--- a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -91,7 +91,8 @@ namespace DotPulsar.Tests.Internal
         private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
         {
             var numberOfSegments = 0;
-            foreach (var segment in sequence)
+            var enumerator = sequence.GetEnumerator();
+            while (enumerator.MoveNext())
                 ++numberOfSegments;
             return numberOfSegments;
         }

Reply via email to