This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3588716  pass cancellation token to async lock
3588716 is described below

commit 3588716ea238134cbc6242940d65949e832ae6f7
Author: Vince Pergolizzi <[email protected]>
AuthorDate: Thu Mar 12 23:17:54 2020 -0400

    pass cancellation token to async lock
---
 src/DotPulsar.Tests/Internal/AsyncLockTests.cs     | 16 ++---
 src/DotPulsar/Internal/Abstractions/IConnection.cs | 31 +++++-----
 .../Internal/Abstractions/IConsumerChannel.cs      | 10 +--
 .../Internal/Abstractions/IProducerChannel.cs      |  5 +-
 src/DotPulsar/Internal/AsyncLock.cs                |  2 +-
 src/DotPulsar/Internal/Connection.cs               | 71 +++++++++++++---------
 src/DotPulsar/Internal/ConnectionPool.cs           | 12 ++--
 src/DotPulsar/Internal/Consumer.cs                 |  8 +--
 src/DotPulsar/Internal/ConsumerChannel.cs          | 31 +++++-----
 src/DotPulsar/Internal/ConsumerChannelFactory.cs   |  2 +-
 src/DotPulsar/Internal/NotReadyChannel.cs          | 13 ++--
 src/DotPulsar/Internal/PingPongHandler.cs          |  5 +-
 src/DotPulsar/Internal/Producer.cs                 |  4 +-
 src/DotPulsar/Internal/ProducerChannel.cs          | 15 ++---
 src/DotPulsar/Internal/ProducerChannelFactory.cs   |  2 +-
 src/DotPulsar/Internal/ReaderChannelFactory.cs     |  2 +-
 16 files changed, 125 insertions(+), 104 deletions(-)

diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs 
b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 3ce1b95..766e9ad 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -29,7 +29,7 @@ namespace DotPulsar.Tests.Internal
             var sut = new AsyncLock();
 
             //Act
-            var actual = sut.Lock();
+            var actual = sut.Lock(CancellationToken.None);
 
             //Assert
             Assert.True(actual.IsCompleted);
@@ -44,10 +44,10 @@ namespace DotPulsar.Tests.Internal
         {
             //Arrange
             var sut = new AsyncLock();
-            var alreadyTaken = await sut.Lock();
+            var alreadyTaken = await sut.Lock(CancellationToken.None);
 
             //Act
-            var actual = sut.Lock();
+            var actual = sut.Lock(CancellationToken.None);
 
             //Assert
             Assert.False(actual.IsCompleted);
@@ -66,7 +66,7 @@ namespace DotPulsar.Tests.Internal
             await sut.DisposeAsync();
 
             //Act
-            var exception = await Record.ExceptionAsync(() => sut.Lock());
+            var exception = await Record.ExceptionAsync(() => 
sut.Lock(CancellationToken.None));
 
             //Assert
             Assert.IsType<ObjectDisposedException>(exception);
@@ -77,8 +77,8 @@ namespace DotPulsar.Tests.Internal
         {
             //Arrange
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
-            var awaiting = sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
+            var awaiting = sut.Lock(CancellationToken.None);
             _ = Task.Run(async () => await sut.DisposeAsync());
 
             //Act
@@ -98,7 +98,7 @@ namespace DotPulsar.Tests.Internal
             //Arrange
             var cts = new CancellationTokenSource();
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
             var awaiting = sut.Lock(cts.Token);
 
             //Act
@@ -119,7 +119,7 @@ namespace DotPulsar.Tests.Internal
         {
             //Arrange
             var sut = new AsyncLock();
-            var gotLock = await sut.Lock();
+            var gotLock = await sut.Lock(CancellationToken.None);
             var disposeTask = Task.Run(async () => await sut.DisposeAsync());
             Assert.False(disposeTask.IsCompleted);
 
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs 
b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 459300e..cce261d 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -14,29 +14,30 @@
 
 using DotPulsar.Internal.PulsarApi;
 using System;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal.Abstractions
 {
     public interface IConnection : IAsyncDisposable
     {
-        ValueTask<bool> HasChannels();
+        ValueTask<bool> HasChannels(CancellationToken cancellationToken);
 
-        Task<ProducerResponse> Send(CommandProducer command, IChannel channel);
-        Task<SubscribeResponse> Send(CommandSubscribe command, IChannel 
channel);
+        Task<ProducerResponse> Send(CommandProducer command, IChannel channel, 
CancellationToken cancellationToken);
+        Task<SubscribeResponse> Send(CommandSubscribe command, IChannel 
channel, CancellationToken cancellationToken);
 
-        Task Send(CommandPing command);
-        Task Send(CommandPong command);
-        Task Send(CommandAck command);
-        Task Send(CommandFlow command);
+        Task Send(CommandPing command, CancellationToken cancellationToken);
+        Task Send(CommandPong command, CancellationToken cancellationToken);
+        Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task Send(CommandFlow command, CancellationToken cancellationToken);
 
-        Task<BaseCommand> Send(CommandUnsubscribe command);
-        Task<BaseCommand> Send(CommandConnect command);
-        Task<BaseCommand> Send(CommandLookupTopic command);
-        Task<BaseCommand> Send(CommandSeek command);
-        Task<BaseCommand> Send(CommandGetLastMessageId command);
-        Task<BaseCommand> Send(CommandCloseProducer command);
-        Task<BaseCommand> Send(CommandCloseConsumer command);
-        Task<BaseCommand> Send(SendPackage command);
+        Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(CommandConnect command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(CommandSeek command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(CommandGetLastMessageId command, 
CancellationToken cancellationToken);
+        Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken 
cancellationToken);
+        Task<BaseCommand> Send(SendPackage command, CancellationToken 
cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index dc05155..a2cbe34 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -21,10 +21,10 @@ namespace DotPulsar.Internal.Abstractions
 {
     public interface IConsumerChannel : IAsyncDisposable
     {
-        Task Send(CommandAck command);
-        Task<CommandSuccess> Send(CommandUnsubscribe command);
-        Task<CommandSuccess> Send(CommandSeek command);
-        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command);
-        ValueTask<Message> Receive(CancellationToken cancellationToken = 
default);
+        Task Send(CommandAck command, CancellationToken cancellationToken);
+        Task<CommandSuccess> Send(CommandUnsubscribe command, 
CancellationToken cancellationToken);
+        Task<CommandSuccess> Send(CommandSeek command, CancellationToken 
cancellationToken);
+        Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId 
command, CancellationToken cancellationToken);
+        ValueTask<Message> Receive(CancellationToken cancellationToken);
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 57276ce..f15fca2 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -15,13 +15,14 @@
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal.Abstractions
 {
     public interface IProducerChannel : IAsyncDisposable
     {
-        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload);
-        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, 
ReadOnlySequence<byte> payload);
+        Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken);
+        Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, 
ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
     }
 }
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/AsyncLock.cs 
b/src/DotPulsar/Internal/AsyncLock.cs
index a374518..de04523 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -36,7 +36,7 @@ namespace DotPulsar.Internal
             _isDisposed = false;
         }
 
-        public Task<IDisposable> Lock(CancellationToken cancellationToken = 
default)
+        public Task<IDisposable> Lock(CancellationToken cancellationToken)
         {
             LinkedListNode<CancelableCompletionSource<IDisposable>>? node = 
null;
 
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index b14770a..eb27e11 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal
@@ -36,19 +37,19 @@ namespace DotPulsar.Internal
             _stream = stream;
         }
 
-        public async ValueTask<bool> HasChannels()
+        public async ValueTask<bool> HasChannels(CancellationToken 
cancellationToken)
         {
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 return _channelManager.HasChannels();
             }
         }
 
-        public async Task<ProducerResponse> Send(CommandProducer command, 
IChannel channel)
+        public async Task<ProducerResponse> Send(CommandProducer command, 
IChannel channel, CancellationToken cancellationToken)
         {
             Task<ProducerResponse>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 var requestResponseTask = 
_requestResponseHandler.Outgoing(baseCommand);
@@ -60,11 +61,11 @@ namespace DotPulsar.Internal
             return await responseTask;
         }
 
-        public async Task<SubscribeResponse> Send(CommandSubscribe command, 
IChannel channel)
+        public async Task<SubscribeResponse> Send(CommandSubscribe command, 
IChannel channel, CancellationToken cancellationToken)
         {
             Task<SubscribeResponse>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 var requestResponseTask = 
_requestResponseHandler.Outgoing(baseCommand);
@@ -76,16 +77,23 @@ namespace DotPulsar.Internal
             return await responseTask;
         }
 
-        public async Task Send(CommandPing command) => await 
Send(command.AsBaseCommand());
-        public async Task Send(CommandPong command) => await 
Send(command.AsBaseCommand());
-        public async Task Send(CommandAck command) => await 
Send(command.AsBaseCommand());
-        public async Task Send(CommandFlow command) => await 
Send(command.AsBaseCommand());
+        public async Task Send(CommandPing command, CancellationToken 
cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
 
-        public async Task<BaseCommand> Send(CommandUnsubscribe command)
+        public async Task Send(CommandPong command, CancellationToken 
cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task Send(CommandAck command, CancellationToken 
cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task Send(CommandFlow command, CancellationToken 
cancellationToken) =>
+            await Send(command.AsBaseCommand(), cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandUnsubscribe command, 
CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -97,16 +105,23 @@ namespace DotPulsar.Internal
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(CommandConnect command) => await 
SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandLookupTopic command) => 
await SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandSeek command) => await 
SendRequestResponse(command.AsBaseCommand());
-        public async Task<BaseCommand> Send(CommandGetLastMessageId command) 
=> await SendRequestResponse(command.AsBaseCommand());
+        public async Task<BaseCommand> Send(CommandConnect command, 
CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), 
cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandLookupTopic command, 
CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), 
cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandSeek command, 
CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), 
cancellationToken);
+
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command, 
CancellationToken cancellationToken) =>
+            await SendRequestResponse(command.AsBaseCommand(), 
cancellationToken);
 
-        public async Task<BaseCommand> Send(CommandCloseProducer command)
+        public async Task<BaseCommand> Send(CommandCloseProducer command, 
CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -118,11 +133,11 @@ namespace DotPulsar.Internal
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(CommandCloseConsumer command)
+        public async Task<BaseCommand> Send(CommandCloseConsumer command, 
CancellationToken cancellationToken)
         {
             Task<BaseCommand>? responseTask = null;
 
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.AsBaseCommand();
                 responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -134,10 +149,10 @@ namespace DotPulsar.Internal
             return await responseTask;
         }
 
-        public async Task<BaseCommand> Send(SendPackage command)
+        public async Task<BaseCommand> Send(SendPackage command, 
CancellationToken cancellationToken)
         {
             Task<BaseCommand>? response = null;
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var baseCommand = command.Command.AsBaseCommand();
                 response = _requestResponseHandler.Outgoing(baseCommand);
@@ -147,10 +162,10 @@ namespace DotPulsar.Internal
             return await response;
         }
 
-        private async Task<BaseCommand> SendRequestResponse(BaseCommand 
command)
+        private async Task<BaseCommand> SendRequestResponse(BaseCommand 
command, CancellationToken cancellationToken)
         {
             Task<BaseCommand>? response = null;
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 response = _requestResponseHandler.Outgoing(command);
                 var sequence = Serializer.Serialize(command);
@@ -159,16 +174,16 @@ namespace DotPulsar.Internal
             return await response;
         }
 
-        private async Task Send(BaseCommand command)
+        private async Task Send(BaseCommand command, CancellationToken 
cancellationToken)
         {
-            using (await _lock.Lock())
+            using (await _lock.Lock(cancellationToken))
             {
                 var sequence = Serializer.Serialize(command);
                 await _stream.Send(sequence);
             }
         }
 
-        public async Task ProcessIncommingFrames()
+        public async Task ProcessIncommingFrames(CancellationToken 
cancellationToken)
         {
             await Task.Yield();
 
@@ -197,7 +212,7 @@ namespace DotPulsar.Internal
                             _channelManager.Incoming(command.CloseProducer);
                             break;
                         case BaseCommand.Type.Ping:
-                            _pingPongHandler.Incoming(command.Ping);
+                            _pingPongHandler.Incoming(command.Ping, 
cancellationToken);
                             break;
                         default:
                             _requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs 
b/src/DotPulsar/Internal/ConnectionPool.cs
index d3dff22..7c1f7b5 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -74,7 +74,7 @@ namespace DotPulsar.Internal
             while (true)
             {
                 var connection = await GetConnection(serviceUrl, 
cancellationToken);
-                var response = await connection.Send(lookup);
+                var response = await connection.Send(lookup, 
cancellationToken);
                 response.Expect(BaseCommand.Type.LookupResponse);
 
                 if (response.LookupTopicResponse.Response == 
CommandLookupTopicResponse.LookupType.Failed)
@@ -124,18 +124,18 @@ namespace DotPulsar.Internal
                 if (_connections.TryGetValue(serviceUrl, out Connection 
connection))
                     return connection;
 
-                return await EstablishNewConnection(serviceUrl);
+                return await EstablishNewConnection(serviceUrl, 
cancellationToken);
             }
         }
 
-        private async Task<Connection> EstablishNewConnection(Uri serviceUrl)
+        private async Task<Connection> EstablishNewConnection(Uri serviceUrl, 
CancellationToken cancellationToken)
         {
             var stream = await _connector.Connect(serviceUrl);
             var connection = new Connection(new PulsarStream(stream));
             DotPulsarEventSource.Log.ConnectionCreated();
             _connections[serviceUrl] = connection;
-            _ = connection.ProcessIncommingFrames().ContinueWith(t => 
DisposeConnection(serviceUrl));
-            var response = await connection.Send(_commandConnect);
+            _ = 
connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => 
DisposeConnection(serviceUrl));
+            var response = await connection.Send(_commandConnect, 
cancellationToken);
             response.Expect(BaseCommand.Type.Connected);
             return connection;
         }
@@ -165,7 +165,7 @@ namespace DotPulsar.Internal
                             var connection = _connections[serviceUrl];
                             if (connection is null)
                                 continue;
-                            if (!await connection.HasChannels())
+                            if (!await 
connection.HasChannels(cancellationToken))
                                 await DisposeConnection(serviceUrl);
                         }
                     }
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 440961d..9b623ad 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -96,21 +96,21 @@ namespace DotPulsar.Internal
         public async ValueTask Unsubscribe(CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            _ = await _executor.Execute(() => _channel.Send(new 
CommandUnsubscribe()), cancellationToken);
+            _ = await _executor.Execute(() => _channel.Send(new 
CommandUnsubscribe(), cancellationToken), cancellationToken);
         }
 
         public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
         {
             ThrowIfDisposed();
             var seek = new CommandSeek { MessageId = messageId.Data };
-            _ = await _executor.Execute(() => _channel.Send(seek), 
cancellationToken);
+            _ = await _executor.Execute(() => _channel.Send(seek, 
cancellationToken), cancellationToken);
             return;
         }
 
         public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(new 
CommandGetLastMessageId()), cancellationToken);
+            var response = await _executor.Execute(() => _channel.Send(new 
CommandGetLastMessageId(), cancellationToken), cancellationToken);
             return new MessageId(response.LastMessageId);
         }
 
@@ -122,7 +122,7 @@ namespace DotPulsar.Internal
                 _cachedCommandAck.Type = ackType;
                 _cachedCommandAck.MessageIds.Clear();
                 _cachedCommandAck.MessageIds.Add(messageIdData);
-                return _channel.Send(_cachedCommandAck);
+                return _channel.Send(_cachedCommandAck, cancellationToken);
             }, cancellationToken);
         }
 
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs 
b/src/DotPulsar/Internal/ConsumerChannel.cs
index 9373da1..735b078 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -52,7 +52,7 @@ namespace DotPulsar.Internal
             while (true)
             {
                 if (_sendWhenZero == 0)
-                    await SendFlow();
+                    await SendFlow(cancellationToken);
 
                 _sendWhenZero--;
 
@@ -64,7 +64,7 @@ namespace DotPulsar.Internal
 
                 if (!messagePackage.IsValid())
                 {
-                    await RejectPackage(messagePackage);
+                    await RejectPackage(messagePackage, cancellationToken);
                     continue;
                 }
 
@@ -79,7 +79,7 @@ namespace DotPulsar.Internal
             }
         }
 
-        public async Task Send(CommandAck command)
+        public async Task Send(CommandAck command, CancellationToken 
cancellationToken)
         {
             var messageId = command.MessageIds[0];
             if (messageId.BatchIndex != -1)
@@ -92,30 +92,30 @@ namespace DotPulsar.Internal
             }
 
             command.ConsumerId = _id;
-            await _connection.Send(command);
+            await _connection.Send(command, cancellationToken);
         }
 
-        public async Task<CommandSuccess> Send(CommandUnsubscribe command)
+        public async Task<CommandSuccess> Send(CommandUnsubscribe command, 
CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.Success);
             return response.Success;
         }
 
-        public async Task<CommandSuccess> Send(CommandSeek command)
+        public async Task<CommandSuccess> Send(CommandSeek command, 
CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.Success);
             _batchHandler.Clear();
             return response.Success;
         }
 
-        public async Task<CommandGetLastMessageIdResponse> 
Send(CommandGetLastMessageId command)
+        public async Task<CommandGetLastMessageIdResponse> 
Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
         {
             command.ConsumerId = _id;
-            var response = await _connection.Send(command);
+            var response = await _connection.Send(command, cancellationToken);
             response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
             return response.GetLastMessageIdResponse;
         }
@@ -125,7 +125,7 @@ namespace DotPulsar.Internal
             try
             {
                 _queue.Dispose();
-                await _connection.Send(new CommandCloseConsumer { ConsumerId = 
_id });
+                await _connection.Send(new CommandCloseConsumer { ConsumerId = 
_id }, CancellationToken.None);
             }
             catch
             {
@@ -133,9 +133,10 @@ namespace DotPulsar.Internal
             }
         }
 
-        private async ValueTask SendFlow()
+        private async ValueTask SendFlow(CancellationToken cancellationToken)
         {
-            await _connection.Send(_cachedCommandFlow); //TODO Should sending 
the flow command be handled on another thread and thereby not slow down the 
consumer?
+            //TODO Should sending the flow command be handled on another 
thread and thereby not slow down the consumer?
+            await _connection.Send(_cachedCommandFlow, cancellationToken);
 
             if (_firstFlow)
             {
@@ -146,7 +147,7 @@ namespace DotPulsar.Internal
             _sendWhenZero = _cachedCommandFlow.MessagePermits;
         }
 
-        private async Task RejectPackage(MessagePackage messagePackage)
+        private async Task RejectPackage(MessagePackage messagePackage, 
CancellationToken cancellationToken)
         {
             var ack = new CommandAck
             {
@@ -156,7 +157,7 @@ namespace DotPulsar.Internal
 
             ack.MessageIds.Add(messagePackage.MessageId);
 
-            await Send(ack);
+            await Send(ack, cancellationToken);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs 
b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5424c32..9746c67 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -65,7 +65,7 @@ namespace DotPulsar.Internal
             var connection = await 
_connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, 
messageQueue);
-            var response = await connection.Send(_subscribe, channel);
+            var response = await connection.Send(_subscribe, channel, 
cancellationToken);
             return new ConsumerChannel(response.ConsumerId, 
_messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs 
b/src/DotPulsar/Internal/NotReadyChannel.cs
index 4dc0ce9..ca958b7 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -28,17 +28,18 @@ namespace DotPulsar.Internal
 
         public ValueTask<Message> Receive(CancellationToken cancellationToken 
= default) => throw GetException();
 
-        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload) 
=> throw GetException();
+        public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, 
CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> payload) => throw GetException();
+        public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) 
=>
+            throw GetException();
 
-        public Task Send(CommandAck command) => throw GetException();
+        public Task Send(CommandAck command, CancellationToken 
cancellationToken) => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandUnsubscribe command) => throw 
GetException();
+        public Task<CommandSuccess> Send(CommandUnsubscribe command, 
CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandSuccess> Send(CommandSeek command) => throw 
GetException();
+        public Task<CommandSuccess> Send(CommandSeek command, 
CancellationToken cancellationToken) => throw GetException();
 
-        public Task<CommandGetLastMessageIdResponse> 
Send(CommandGetLastMessageId command) => throw GetException();
+        public Task<CommandGetLastMessageIdResponse> 
Send(CommandGetLastMessageId command, CancellationToken cancellationToken) => 
throw GetException();
 
         private Exception GetException() => new ChannelNotReadyException();
     }
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs 
b/src/DotPulsar/Internal/PingPongHandler.cs
index a0e0c23..8b16349 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -12,6 +12,7 @@
  * limitations under the License.
  */
 
+using System.Threading;
 using DotPulsar.Internal.Abstractions;
 using DotPulsar.Internal.PulsarApi;
 
@@ -28,9 +29,9 @@ namespace DotPulsar.Internal
             _pong = new CommandPong();
         }
 
-        public void Incoming(CommandPing ping)
+        public void Incoming(CommandPing ping, CancellationToken 
cancellationToken)
         {
-            _ = _connection.Send(_pong);
+            _ = _connection.Send(_pong, cancellationToken);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index f5750f1..2d850cf 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -76,7 +76,7 @@ namespace DotPulsar.Internal
         public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => _channel.Send(data), 
cancellationToken);
+            var response = await _executor.Execute(() => _channel.Send(data, 
cancellationToken), cancellationToken);
             return new MessageId(response.MessageId);
         }
 
@@ -89,7 +89,7 @@ namespace DotPulsar.Internal
         public async ValueTask<MessageId> Send(MessageMetadata metadata, 
ReadOnlySequence<byte> data, CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
-            var response = await _executor.Execute(() => 
_channel.Send(metadata.Metadata, data), cancellationToken);
+            var response = await _executor.Execute(() => 
_channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken);
             return new MessageId(response.MessageId);
         }
 
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs 
b/src/DotPulsar/Internal/ProducerChannel.cs
index d9dc561..49b70a9 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -17,6 +17,7 @@ using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
 using System;
 using System.Buffers;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace DotPulsar.Internal
@@ -53,7 +54,7 @@ namespace DotPulsar.Internal
         {
             try
             {
-                await _connection.Send(new CommandCloseProducer { ProducerId = 
_id });
+                await _connection.Send(new CommandCloseProducer { ProducerId = 
_id }, CancellationToken.None);
             }
             catch
             {
@@ -61,22 +62,22 @@ namespace DotPulsar.Internal
             }
         }
 
-        public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> 
payload)
+        public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> 
payload, CancellationToken cancellationToken)
         {
             _cachedSendPackage.Metadata = _cachedMetadata;
             _cachedSendPackage.Payload = payload;
-            return await SendPackage(true);
+            return await SendPackage(true, cancellationToken);
         }
 
-        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> payload)
+        public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata 
metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
         {
             metadata.ProducerName = _cachedMetadata.ProducerName;
             _cachedSendPackage.Metadata = metadata;
             _cachedSendPackage.Payload = payload;
-            return await SendPackage(metadata.SequenceId == 0);
+            return await SendPackage(metadata.SequenceId == 0, 
cancellationToken);
         }
 
-        private async Task<CommandSendReceipt> SendPackage(bool 
autoAssignSequenceId)
+        private async Task<CommandSendReceipt> SendPackage(bool 
autoAssignSequenceId, CancellationToken cancellationToken)
         {
             try
             {
@@ -90,7 +91,7 @@ namespace DotPulsar.Internal
                 else
                     _cachedSendPackage.Command.SequenceId = 
_cachedSendPackage.Metadata.SequenceId;
 
-                var response = await _connection.Send(_cachedSendPackage);
+                var response = await _connection.Send(_cachedSendPackage, 
cancellationToken);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
                 if (autoAssignSequenceId)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs 
b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index da31a54..7426c22 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -56,7 +56,7 @@ namespace DotPulsar.Internal
         {
             var connection = await 
_connectionPool.FindConnectionForTopic(_commandProducer.Topic, 
cancellationToken);
             var channel = new Channel(_correlationId, _eventRegister, new 
AsyncQueue<MessagePackage>());
-            var response = await connection.Send(_commandProducer, channel);
+            var response = await connection.Send(_commandProducer, channel, 
cancellationToken);
             return new ProducerChannel(response.ProducerId, 
response.ProducerName, _sequenceId, connection);
         }
     }
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs 
b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index e95687e..e140854 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -64,7 +64,7 @@ namespace DotPulsar.Internal
             var connection = await 
_connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
             var messageQueue = new AsyncQueue<MessagePackage>();
             var channel = new Channel(_correlationId, _eventRegister, 
messageQueue);
-            var response = await connection.Send(_subscribe, channel);
+            var response = await connection.Send(_subscribe, channel, 
cancellationToken);
             return new ConsumerChannel(response.ConsumerId, 
_messagePrefetchCount, messageQueue, connection, _batchHandler);
         }
     }

Reply via email to