This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 382c44a  Minor cleanup
382c44a is described below

commit 382c44a0fe22444c511c0abf2254ece8dc27b639
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Tue Dec 15 21:22:42 2020 +0100

    Minor cleanup
---
 .../Internal/Abstractions/IConsumerChannel.cs      |  2 +-
 .../Internal/Abstractions/IProducerChannel.cs      |  2 +-
 .../Internal/Abstractions/IReaderChannel.cs        |  4 +--
 src/DotPulsar/Internal/Consumer.cs                 | 40 ++++++++++++++--------
 src/DotPulsar/Internal/Producer.cs                 | 15 +++++---
 src/DotPulsar/Internal/Reader.cs                   | 13 +++++--
 6 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 4daf83e..31140b2 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -27,6 +27,6 @@ namespace DotPulsar.Internal.Abstractions
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken 
cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command, CancellationToken cancellationToken);
         ValueTask<Message> Receive(CancellationToken cancellationToken);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index dd806f4..b214237 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -23,6 +23,6 @@ namespace DotPulsar.Internal.Abstractions
     public interface IProducerChannel : IAsyncDisposable
     {
         Task<CommandSendReceipt> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
index a692c18..55135ea 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
@@ -23,7 +23,7 @@ namespace DotPulsar.Internal.Abstractions
     {
         Task<CommandSuccess> Send(CommandSeek command, CancellationToken 
cancellationToken);
         Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command, CancellationToken cancellationToken);
-        ValueTask<Message> Receive(CancellationToken cancellationToken = 
default);
-        ValueTask ClosedByClient(CancellationToken cancellationToken = 
default);
+        ValueTask<Message> Receive(CancellationToken cancellationToken);
+        ValueTask ClosedByClient(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 2282711..c6e9dfc 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -83,7 +83,7 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ConsumerDisposed(_correlationId, 
this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -111,7 +111,13 @@ namespace DotPulsar.Internal
             => await Acknowledge(messageId.Data, 
CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
         public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
-            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => 
m.Data).ToList(), cancellationToken).ConfigureAwait(false);
+        {
+            ThrowIfDisposed();
+
+            var command = new CommandRedeliverUnacknowledgedMessages();
+            command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+            await _executor.Execute(() => 
RedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        }
 
         public async ValueTask 
RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
             => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
@@ -121,7 +127,12 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var unsubscribe = new CommandUnsubscribe();
-            _ = await _executor.Execute(() => _channel.Send(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+            await _executor.Execute(() => Unsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask Unsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
+        {
+            _ = await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
         }
 
         public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
@@ -161,7 +172,12 @@ namespace DotPulsar.Internal
             ThrowIfDisposed();
 
             var getLastMessageId = new CommandGetLastMessageId();
-            var response = await _executor.Execute(() => 
_channel.Send(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+        {
+            var response = await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
             return new MessageId(response.LastMessageId);
         }
 
@@ -179,10 +195,7 @@ namespace DotPulsar.Internal
 
             try
             {
-                await _executor.Execute(() =>
-                {
-                    return _channel.Send(commandAck, cancellationToken);
-                }, cancellationToken).ConfigureAwait(false);
+                await _executor.Execute(() => Acknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -190,14 +203,11 @@ namespace DotPulsar.Internal
             }
         }
 
-        private async ValueTask 
RedeliverUnacknowledgedMessages(List<MessageIdData> messageIds, 
CancellationToken cancellationToken)
-        {
-            ThrowIfDisposed();
+        private async ValueTask Acknowledge(CommandAck command, 
CancellationToken cancellationToken)
+            => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
 
-            var redeliverUnacknowledgedMessages = new 
CommandRedeliverUnacknowledgedMessages();
-            redeliverUnacknowledgedMessages.MessageIds.AddRange(messageIds);
-            await _executor.Execute(() => 
_channel.Send(redeliverUnacknowledgedMessages, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-        }
+        private async ValueTask 
RedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages command, 
CancellationToken cancellationToken)
+            => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
 
         internal async ValueTask SetChannel(IConsumerChannel channel)
         {
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index 1ac6886..2347eee 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -84,7 +84,7 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ProducerDisposed(_correlationId, 
this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 
@@ -97,12 +97,12 @@ namespace DotPulsar.Internal
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
+
             var metadata = _messageMetadataPool.Get();
             try
             {
                 metadata.SequenceId = _sequenceId.FetchNext();
-                var response = await _executor.Execute(() => 
_channel.Send(metadata, data, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-                return new MessageId(response.MessageId);
+                return await _executor.Execute(() => Send(metadata, data, 
cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -126,8 +126,7 @@ namespace DotPulsar.Internal
 
             try
             {
-                var response = await _executor.Execute(() => 
_channel.Send(metadata.Metadata, data, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-                return new MessageId(response.MessageId);
+                return await _executor.Execute(() => Send(metadata.Metadata, 
data, cancellationToken), cancellationToken).ConfigureAwait(false);
             }
             finally
             {
@@ -136,6 +135,12 @@ namespace DotPulsar.Internal
             }
         }
 
+        private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
+        {
+            var response = await _channel.Send(metadata, data, 
cancellationToken).ConfigureAwait(false);
+            return new MessageId(response.MessageId);
+        }
+
         internal async ValueTask SetChannel(IProducerChannel channel)
         {
             if (_isDisposed != 0)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index eeb95f5..571c5b6 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,10 +73,17 @@ namespace DotPulsar.Internal
         public bool IsFinalState(ReaderState state)
             => _state.IsFinalState(state);
 
-        public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken = default)
+        public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
         {
+            ThrowIfDisposed();
+
             var getLastMessageId = new CommandGetLastMessageId();
-            var response = await _executor.Execute(() => 
_channel.Send(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+            return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        }
+
+        private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+        {
+            var response = await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
             return new MessageId(response.LastMessageId);
         }
 
@@ -129,7 +136,7 @@ namespace DotPulsar.Internal
                 return;
 
             _eventRegister.Register(new ReaderDisposed(_correlationId, this));
-            await _channel.ClosedByClient().ConfigureAwait(false);
+            await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
 

Reply via email to