This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f141fa8  Guard Consumers/Readers and Producers from sending messages 
that the broker will ignore if it has sent a CommandClose[Producer/Consumer]
f141fa8 is described below

commit f141fa82a633812d2cefeebee283036e9183f903
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Mar 15 12:52:13 2021 +0100

    Guard Consumers/Readers and Producers from sending messages that the broker 
will ignore if it has sent a CommandClose[Producer/Consumer]
---
 src/DotPulsar/Internal/Abstractions/IChannel.cs    |   3 +
 src/DotPulsar/Internal/Abstractions/IRequest.cs    |   6 +-
 src/DotPulsar/Internal/Channel.cs                  |  49 ++++++-
 src/DotPulsar/Internal/ChannelManager.cs           | 162 +++++++++++++++++----
 src/DotPulsar/Internal/Connection.cs               | 150 ++++++++++---------
 src/DotPulsar/Internal/RequestResponseHandler.cs   | 124 +++++++++++-----
 src/DotPulsar/Internal/Requests/ConnectRequest.cs  |   6 +
 src/DotPulsar/Internal/Requests/SendRequest.cs     |  18 ++-
 src/DotPulsar/Internal/Requests/StandardRequest.cs |  31 +++-
 9 files changed, 405 insertions(+), 144 deletions(-)

diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs 
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index 40f8af5..d9409dd 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -14,6 +14,8 @@
 
 namespace DotPulsar.Internal.Abstractions
 {
+    using System;
+
     public interface IChannel
     {
         void Received(MessagePackage message);
@@ -24,5 +26,6 @@ namespace DotPulsar.Internal.Abstractions
         void Disconnected();
         void ReachedEndOfTopic();
         void Unsubscribed();
+        IDisposable SenderLock();
     }
 }
diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs 
b/src/DotPulsar/Internal/Abstractions/IRequest.cs
index f97314f..8ea7913 100644
--- a/src/DotPulsar/Internal/Abstractions/IRequest.cs
+++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs
@@ -16,5 +16,9 @@ namespace DotPulsar.Internal.Abstractions
 {
     using System;
 
-    public interface IRequest : IEquatable<IRequest> { }
+    public interface IRequest : IEquatable<IRequest>
+    {
+        bool SenderIsProducer(ulong producerId);
+        bool SenderIsConsumer(ulong consumerId);
+    }
 }
diff --git a/src/DotPulsar/Internal/Channel.cs 
b/src/DotPulsar/Internal/Channel.cs
index 8d71572..b441680 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -17,15 +17,18 @@ namespace DotPulsar.Internal
     using Abstractions;
     using Events;
     using System;
+    using System.Threading;
 
     public sealed class Channel : IChannel
     {
+        private readonly Lock _senderLock;
         private readonly Guid _correlationId;
         private readonly IRegisterEvent _eventRegister;
         private readonly IEnqueue<MessagePackage> _enqueue;
 
         public Channel(Guid correlationId, IRegisterEvent eventRegister, 
IEnqueue<MessagePackage> enqueue)
         {
+            _senderLock = new Lock();
             _correlationId = correlationId;
             _eventRegister = eventRegister;
             _enqueue = enqueue;
@@ -38,7 +41,10 @@ namespace DotPulsar.Internal
             => _eventRegister.Register(new ChannelActivated(_correlationId));
 
         public void ClosedByServer()
-            => _eventRegister.Register(new 
ChannelClosedByServer(_correlationId));
+        {
+            _senderLock.Disable();
+            _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+        }
 
         public void Connected()
             => _eventRegister.Register(new ChannelConnected(_correlationId));
@@ -47,12 +53,51 @@ namespace DotPulsar.Internal
             => _eventRegister.Register(new ChannelDeactivated(_correlationId));
 
         public void Disconnected()
-            => _eventRegister.Register(new 
ChannelDisconnected(_correlationId));
+        {
+            _senderLock.Disable();
+            _eventRegister.Register(new ChannelDisconnected(_correlationId));
+        }
 
         public void ReachedEndOfTopic()
             => _eventRegister.Register(new 
ChannelReachedEndOfTopic(_correlationId));
 
         public void Unsubscribed()
             => _eventRegister.Register(new 
ChannelUnsubscribed(_correlationId));
+
+        public IDisposable SenderLock()
+            => _senderLock.Enter();
+
+        private sealed class Lock : IDisposable
+        {
+            private readonly object _lock;
+            private bool _canSend;
+
+            public Lock()
+            {
+                _lock = new object();
+                _canSend = true;
+            }
+
+            public void Disable()
+            {
+                Monitor.Enter(_lock);
+                _canSend = false;
+                Monitor.Exit(_lock);
+            }
+
+            public IDisposable Enter()
+            {
+                Monitor.Enter(_lock);
+
+                if (_canSend)
+                    return this;
+
+                Monitor.Exit(_lock);
+                throw new OperationCanceledException();
+            }
+
+            public void Dispose()
+                => Monitor.Exit(_lock);
+        }
     }
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs 
b/src/DotPulsar/Internal/ChannelManager.cs
index d2eda4d..ed3fa51 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -23,29 +23,37 @@ namespace DotPulsar.Internal
 
     public sealed class ChannelManager : IDisposable
     {
+        private readonly RequestResponseHandler _requestResponseHandler;
         private readonly IdLookup<IChannel> _consumerChannels;
         private readonly IdLookup<IChannel> _producerChannels;
+        private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> 
_incoming;
 
         public ChannelManager()
         {
+            _requestResponseHandler = new RequestResponseHandler();
             _consumerChannels = new IdLookup<IChannel>();
             _producerChannels = new IdLookup<IChannel>();
+            _incoming = new EnumLookup<BaseCommand.Type, 
Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+            _incoming.Set(BaseCommand.Type.CloseConsumer, cmd => 
Incoming(cmd.CloseConsumer));
+            _incoming.Set(BaseCommand.Type.CloseProducer, cmd => 
Incoming(cmd.CloseProducer));
+            _incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd => 
Incoming(cmd.ActiveConsumerChange));
+            _incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => 
Incoming(cmd.ReachedEndOfTopic));
         }
 
         public bool HasChannels()
             => !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
 
-        public Task<ProducerResponse> Outgoing(CommandProducer command, 
Task<BaseCommand> response, IChannel channel)
+        public Task<ProducerResponse> Outgoing(CommandProducer command, 
IChannel channel)
         {
             var producerId = _producerChannels.Add(channel);
-
             command.ProducerId = producerId;
+            var response = _requestResponseHandler.Outgoing(command);
 
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
                 {
-                    _producerChannels.Remove(producerId);
+                    _ = _producerChannels.Remove(producerId);
                     result.Result.Error.Throw();
                 }
 
@@ -55,65 +63,164 @@ namespace DotPulsar.Internal
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
         }
 
-        public Task<SubscribeResponse> Outgoing(CommandSubscribe command, 
Task<BaseCommand> response, IChannel channel)
+        public Task<SubscribeResponse> Outgoing(CommandSubscribe command, 
IChannel channel)
         {
             var consumerId = _consumerChannels.Add(channel);
-
             command.ConsumerId = consumerId;
+            var response = _requestResponseHandler.Outgoing(command);
 
             return response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Error)
                 {
-                    _consumerChannels.Remove(consumerId);
+                    _ = _consumerChannels.Remove(consumerId);
                     result.Result.Error.Throw();
                 }
 
                 channel.Connected();
+
                 return new SubscribeResponse(consumerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
         }
 
-        public void Outgoing(CommandCloseConsumer command, Task<BaseCommand> 
response)
+        public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
         {
             var consumerId = command.ConsumerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeConsumerSenderLock(consumerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _consumerChannels.Remove(consumerId);
+                    _ = _consumerChannels.Remove(consumerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
         }
 
-        public void Outgoing(CommandCloseProducer command, Task<BaseCommand> 
response)
+        public Task<BaseCommand> Outgoing(CommandCloseProducer command)
         {
             var producerId = command.ProducerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeProducerSenderLock(producerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _producerChannels.Remove(producerId);
+                    _ = _producerChannels.Remove(producerId);
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
         }
 
-        public void Outgoing(CommandUnsubscribe command, Task<BaseCommand> 
response)
+        public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
         {
             var consumerId = command.ConsumerId;
 
+            Task<BaseCommand> response;
+
+            using (TakeConsumerSenderLock(consumerId))
+            {
+                response = _requestResponseHandler.Outgoing(command);
+            }
+
             _ = response.ContinueWith(result =>
             {
                 if (result.Result.CommandType == BaseCommand.Type.Success)
-                    _consumerChannels.Remove(consumerId)?.Unsubscribed();
+                   _consumerChannels.Remove(consumerId)?.Unsubscribed();
             }, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+            return response;
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSend command)
+        {
+            using (TakeProducerSenderLock(command.ProducerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
+            => _requestResponseHandler.Outgoing(command);
+
+        public Task<BaseCommand> Outgoing(CommandConnect command)
+            => _requestResponseHandler.Outgoing(command);
+
+        public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+            => _requestResponseHandler.Outgoing(command);
+
+        public Task<BaseCommand> Outgoing(CommandSeek command)
+        {
+            using (TakeConsumerSenderLock(command.ConsumerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+        {
+            using (TakeConsumerSenderLock(command.ConsumerId))
+            {
+                return _requestResponseHandler.Outgoing(command);
+            }
         }
 
-        public void Incoming(CommandCloseConsumer command)
-            => _consumerChannels.Remove(command.ConsumerId)?.ClosedByServer();
+        public void Incoming(BaseCommand command)
+            => _incoming.Get(command.CommandType)(command);
 
-        public void Incoming(CommandCloseProducer command)
-            => _producerChannels.Remove(command.ProducerId)?.ClosedByServer();
+        public void Incoming(CommandMessage command, ReadOnlySequence<byte> 
data)
+            => _consumerChannels[command.ConsumerId]?.Received(new 
MessagePackage(command.MessageId, command.RedeliveryCount, data));
 
-        public void Incoming(CommandActiveConsumerChange command)
+        public void Dispose()
+        {
+            _requestResponseHandler.Dispose();
+
+            foreach (var channel in _consumerChannels.RemoveAll())
+                channel.Disconnected();
+
+            foreach (var channel in _producerChannels.RemoveAll())
+                channel.Disconnected();
+        }
+
+        private void Incoming(CommandReachedEndOfTopic command)
+            => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
+
+        private void Incoming(CommandCloseConsumer command)
+        {
+            var channel = _consumerChannels[command.ConsumerId];
+
+            if (channel is null)
+                return;
+
+            _ = _consumerChannels.Remove(command.ConsumerId);
+            _requestResponseHandler.Incoming(command);
+            channel.ClosedByServer();
+        }
+
+        private void Incoming(CommandCloseProducer command)
+        {
+            var channel = _producerChannels[command.ProducerId];
+
+            if (channel is null)
+                return;
+
+            _ = _producerChannels.Remove(command.ProducerId);
+            _requestResponseHandler.Incoming(command);
+            channel.ClosedByServer();
+        }
+
+        private void Incoming(CommandActiveConsumerChange command)
         {
             var channel = _consumerChannels[command.ConsumerId];
 
@@ -126,19 +233,22 @@ namespace DotPulsar.Internal
                 channel.Deactivated();
         }
 
-        public void Incoming(CommandReachedEndOfTopic command)
-            => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
+        private IDisposable TakeConsumerSenderLock(ulong consumerId)
+        {
+            var channel = _consumerChannels[consumerId];
+            if (channel is null)
+                throw new OperationCanceledException();
 
-        public void Incoming(CommandMessage command, ReadOnlySequence<byte> 
data)
-            => _consumerChannels[command.ConsumerId]?.Received(new 
MessagePackage(command.MessageId, command.RedeliveryCount, data));
+            return channel.SenderLock();
+        }
 
-        public void Dispose()
+        private IDisposable TakeProducerSenderLock(ulong producerId)
         {
-            foreach (var channel in _consumerChannels.RemoveAll())
-                channel.Disconnected();
+            var channel = _producerChannels[producerId];
+            if (channel is null)
+                throw new OperationCanceledException();
 
-            foreach (var channel in _producerChannels.RemoveAll())
-                channel.Disconnected();
+            return channel.SenderLock();
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Connection.cs 
b/src/DotPulsar/Internal/Connection.cs
index 8b24df8..efc1b12 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@ namespace DotPulsar.Internal
     using Exceptions;
     using Extensions;
     using PulsarApi;
-    using System;
     using System.Buffers;
     using System.Threading;
     using System.Threading.Tasks;
@@ -27,7 +26,6 @@ namespace DotPulsar.Internal
     {
         private readonly AsyncLock _lock;
         private readonly ChannelManager _channelManager;
-        private readonly RequestResponseHandler _requestResponseHandler;
         private readonly PingPongHandler _pingPongHandler;
         private readonly IPulsarStream _stream;
         private int _isDisposed;
@@ -36,7 +34,6 @@ namespace DotPulsar.Internal
         {
             _lock = new AsyncLock();
             _channelManager = new ChannelManager();
-            _requestResponseHandler = new RequestResponseHandler();
             _pingPongHandler = new PingPongHandler(this);
             _stream = stream;
         }
@@ -59,10 +56,8 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                var requestResponseTask = 
_requestResponseHandler.Outgoing(baseCommand);
-                responseTask = _channelManager.Outgoing(command, 
requestResponseTask, channel);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command, channel);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -77,10 +72,8 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                var requestResponseTask = 
_requestResponseHandler.Outgoing(baseCommand);
-                responseTask = _channelManager.Outgoing(command, 
requestResponseTask, channel);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command, channel);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
@@ -110,29 +103,31 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        public Task<BaseCommand> Send(CommandConnect command, 
CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+        public async Task<BaseCommand> Send(CommandConnect command, 
CancellationToken cancellationToken)
+        {
+            ThrowIfDisposed();
 
-        public Task<BaseCommand> Send(CommandLookupTopic command, 
CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            Task<BaseCommand>? responseTask;
 
-        public Task<BaseCommand> Send(CommandSeek command, CancellationToken 
cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
 
-        public Task<BaseCommand> Send(CommandGetLastMessageId command, 
CancellationToken cancellationToken)
-            => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+            return await responseTask.ConfigureAwait(false);
+        }
 
-        public async Task<BaseCommand> Send(CommandCloseProducer command, 
CancellationToken cancellationToken)
+        public async Task<BaseCommand> Send(CommandLookupTopic command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
@@ -140,17 +135,15 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task<BaseCommand> Send(CommandCloseConsumer command, 
CancellationToken cancellationToken)
+        public async Task<BaseCommand> Send(CommandSeek command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
@@ -158,34 +151,31 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                _channelManager.Outgoing(command, responseTask);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task<BaseCommand> Send(SendPackage command, 
CancellationToken cancellationToken)
+        public async Task<BaseCommand> Send(CommandGetLastMessageId command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            Task<BaseCommand>? response;
+            Task<BaseCommand>? responseTask;
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.Command!.AsBaseCommand();
-                response = _requestResponseHandler.Outgoing(baseCommand);
-                var sequence = Serializer.Serialize(baseCommand, 
command.Metadata!, command.Payload);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
-            return await response.ConfigureAwait(false);
+            return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, 
CancellationToken cancellationToken)
+        public async Task<BaseCommand> Send(CommandCloseProducer command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
@@ -193,58 +183,77 @@ namespace DotPulsar.Internal
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var baseCommand = command.AsBaseCommand();
-                responseTask = _requestResponseHandler.Outgoing(baseCommand);
-                var sequence = Serializer.Serialize(baseCommand);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
             return await responseTask.ConfigureAwait(false);
         }
 
-        private async Task<BaseCommand> SendRequestResponse(BaseCommand 
command, CancellationToken cancellationToken)
+        public async Task<BaseCommand> Send(CommandCloseConsumer command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
-            Task<BaseCommand>? response;
+            Task<BaseCommand>? responseTask;
 
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                response = _requestResponseHandler.Outgoing(command);
-                var sequence = Serializer.Serialize(command);
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
 
-            return await response.ConfigureAwait(false);
+            return await responseTask.ConfigureAwait(false);
         }
 
-        private async Task Send(BaseCommand command, CancellationToken 
cancellationToken)
+        public async Task<BaseCommand> Send(SendPackage command, 
CancellationToken cancellationToken)
         {
             ThrowIfDisposed();
 
+            Task<BaseCommand>? responseTask;
+
             using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                var sequence = Serializer.Serialize(command);
+                responseTask = _channelManager.Outgoing(command.Command!);
+                var sequence = 
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!, 
command.Payload);
                 await _stream.Send(sequence).ConfigureAwait(false);
             }
+
+            return await responseTask.ConfigureAwait(false);
         }
 
-        public async Task ProcessIncommingFrames()
+        public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, 
CancellationToken cancellationToken)
         {
-            await Task.Yield();
+            ThrowIfDisposed();
 
-            var lookup = new EnumLookup<BaseCommand.Type, 
Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+            Task<BaseCommand>? responseTask;
 
-            lookup.Set(BaseCommand.Type.CloseConsumer, cmd => 
_channelManager.Incoming(cmd.CloseConsumer));
-            lookup.Set(BaseCommand.Type.ActiveConsumerChange, cmd => 
_channelManager.Incoming(cmd.ActiveConsumerChange));
-            lookup.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => 
_channelManager.Incoming(cmd.ReachedEndOfTopic));
-            lookup.Set(BaseCommand.Type.Ping, cmd => 
_pingPongHandler.GotPing());
-            lookup.Set(BaseCommand.Type.CloseProducer, cmd =>
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
             {
-                _channelManager.Incoming(cmd.CloseProducer);
-                _requestResponseHandler.Incoming(cmd.CloseProducer);
-            });
-            
+                responseTask = _channelManager.Outgoing(command);
+                var sequence = Serializer.Serialize(command.AsBaseCommand());
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+
+            return await responseTask.ConfigureAwait(false);
+        }
+
+        private async Task Send(BaseCommand command, CancellationToken 
cancellationToken)
+        {
+            ThrowIfDisposed();
+
+            var sequence = Serializer.Serialize(command);
+
+            using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+            {
+                await _stream.Send(sequence).ConfigureAwait(false);
+            }
+        }
+
+        public async Task ProcessIncommingFrames()
+        {
+            await Task.Yield();
 
             try
             {
@@ -253,10 +262,18 @@ namespace DotPulsar.Internal
                     var commandSize = frame.ReadUInt32(0, true);
                     var command = 
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
 
-                    if (command.CommandType == BaseCommand.Type.Message)
-                        _channelManager.Incoming(command.Message, new 
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
-                    else
-                        lookup.Get(command.CommandType)(command);
+                    switch (command.CommandType)
+                    {
+                        case BaseCommand.Type.Message:
+                            _channelManager.Incoming(command.Message, new 
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+                            break;
+                        case BaseCommand.Type.Ping:
+                            _pingPongHandler.GotPing();
+                            break;
+                        default:
+                            _channelManager.Incoming(command);
+                            break;
+                    }
                 }
             }
             catch
@@ -271,7 +288,6 @@ namespace DotPulsar.Internal
                 return;
 
             await _lock.DisposeAsync().ConfigureAwait(false);
-            _requestResponseHandler.Dispose();
             _channelManager.Dispose();
             await _stream.DisposeAsync().ConfigureAwait(false);
         }
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs 
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index b3b7d0a..5cbd632 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -24,57 +24,97 @@ namespace DotPulsar.Internal
     {
         private readonly RequestId _requestId;
         private readonly Awaiter<IRequest, BaseCommand> _requests;
-        private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> 
_setRequestId;
         private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand, 
IRequest>> _getResponseIdentifier;
 
         public RequestResponseHandler()
         {
             _requestId = new RequestId();
-
             _requests = new Awaiter<IRequest, BaseCommand>();
 
-            _setRequestId = new EnumLookup<BaseCommand.Type, 
Action<BaseCommand>>(cmd => { });
-            _setRequestId.Set(BaseCommand.Type.Seek, cmd => cmd.Seek.RequestId 
= _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Error, cmd => 
cmd.Error.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Producer, cmd => 
cmd.Producer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Lookup, cmd => 
cmd.LookupTopic.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Subscribe, cmd => 
cmd.Subscribe.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.Unsubscribe, cmd => 
cmd.Unsubscribe.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.CloseConsumer, cmd => 
cmd.CloseConsumer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.CloseProducer, cmd => 
cmd.CloseProducer.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.GetLastMessageId, cmd => 
cmd.GetLastMessageId.RequestId = _requestId.FetchNext());
-            _setRequestId.Set(BaseCommand.Type.GetOrCreateSchema, cmd => 
cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext());
-
-            _getResponseIdentifier = new EnumLookup<BaseCommand.Type, 
Func<BaseCommand, IRequest>>(cmd => throw new 
ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType, 
"CommandType not supported as request/response type"));
-            _getResponseIdentifier.Set(BaseCommand.Type.Connect, cmd => new 
ConnectRequest());
+            _getResponseIdentifier = new EnumLookup<BaseCommand.Type, 
Func<BaseCommand, IRequest>>(cmd => throw new Exception($"CommandType 
'{cmd.CommandType}' not supported as request/response type"));
             _getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new 
ConnectRequest());
-            _getResponseIdentifier.Set(BaseCommand.Type.Seek, cmd => new 
StandardRequest(cmd.Seek.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Send, cmd => new 
SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId));
             _getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new 
SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
             _getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd => 
new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Producer, cmd => new 
StandardRequest(cmd.Producer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd 
=> new StandardRequest(cmd.ProducerSuccess.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => 
new StandardRequest(cmd.CloseConsumer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => 
new StandardRequest(cmd.CloseProducer.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Lookup, cmd => new 
StandardRequest(cmd.LookupTopic.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => 
new StandardRequest(cmd.LookupTopicResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Subscribe, cmd => new 
StandardRequest(cmd.Subscribe.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Unsubscribe, cmd => 
new StandardRequest(cmd.Unsubscribe.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageId, cmd 
=> new StandardRequest(cmd.GetLastMessageId.RequestId));
-            
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => 
new StandardRequest(cmd.GetLastMessageIdResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchema, cmd 
=> new StandardRequest(cmd.GetOrCreateSchema.RequestId));
-            
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => 
new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => new 
StandardRequest(cmd.Success.RequestId));
-            _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => 
!_requestId.IsPastInitialId() ? new ConnectRequest() : new 
StandardRequest(cmd.Error.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd 
=> StandardRequest.WithRequestId(cmd.ProducerSuccess.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd => 
StandardRequest.WithConsumerId(cmd.CloseConsumer.RequestId, 
cmd.CloseConsumer.ConsumerId));
+            _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd => 
StandardRequest.WithProducerId(cmd.CloseProducer.RequestId, 
cmd.CloseProducer.ProducerId));
+            _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd => 
StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
+            
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd => 
StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
+            
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd => 
StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => 
StandardRequest.WithRequestId(cmd.Success.RequestId));
+            _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd => 
!_requestId.IsPastInitialId() ? new ConnectRequest() : 
StandardRequest.WithRequestId(cmd.Error.RequestId));
         }
 
         public void Dispose()
-                => _requests.Dispose();
+            => _requests.Dispose();
+
+        public Task<BaseCommand> Outgoing(CommandProducer command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithProducerId(command.RequestId, 
command.ProducerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandCloseProducer command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithProducerId(command.RequestId, 
command.ProducerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSubscribe command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, 
command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, 
command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithConsumerId(command.RequestId, 
command.ConsumerId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSend command)
+        {
+            var request = new SendRequest(command.ProducerId, 
command.SequenceId);
+            return _requests.CreateTask(request);
+        }
 
-        public Task<BaseCommand> Outgoing(BaseCommand command)
+        public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
         {
-            _setRequestId.Get(command.CommandType)(command);
-            return 
_requests.CreateTask(_getResponseIdentifier.Get(command.CommandType)(command));
+            command.RequestId = _requestId.FetchNext();
+            var request = StandardRequest.WithRequestId(command.RequestId);
+            return _requests.CreateTask(request);
+        }
+
+        public Task<BaseCommand> Outgoing(CommandConnect _1)
+            => _requests.CreateTask(new ConnectRequest());
+
+        public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return 
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+        }
+
+        public Task<BaseCommand> Outgoing(CommandSeek command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return 
_requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, 
command.ConsumerId));
+        }
+
+        public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+        {
+            command.RequestId = _requestId.FetchNext();
+            return 
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
         }
 
         public void Incoming(BaseCommand command)
@@ -85,12 +125,22 @@ namespace DotPulsar.Internal
                 _requests.SetResult(identifier, command);
         }
 
+        public void Incoming(CommandCloseConsumer command)
+        {
+            var requests = _requests.Keys;
+            foreach (var request in requests)
+            {
+                if (request.SenderIsConsumer(command.ConsumerId))
+                    _requests.Cancel(request);
+            }
+        }
+
         public void Incoming(CommandCloseProducer command)
         {
             var requests = _requests.Keys;
             foreach (var request in requests)
             {
-                if (request is SendRequest sendRequest && 
sendRequest.ProducerId == command.ProducerId)
+                if (request.SenderIsProducer(command.ProducerId))
                     _requests.Cancel(request);
             }
         }
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs 
b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index ceca386..5ab0e20 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -19,6 +19,12 @@ namespace DotPulsar.Internal.Requests
 
     public struct ConnectRequest : IRequest
     {
+        public bool SenderIsConsumer(ulong consumerId)
+            => false;
+
+        public bool SenderIsProducer(ulong producerId)
+            => false;
+
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
 #else
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs 
b/src/DotPulsar/Internal/Requests/SendRequest.cs
index 5f15fd2..36498b6 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -20,15 +20,21 @@ namespace DotPulsar.Internal.Requests
 
     public struct SendRequest : IRequest
     {
-        public ulong ProducerId { get; }
-        public ulong SequenceId { get; }
+        private readonly ulong _producerId;
+        private readonly ulong _sequenceId;
 
         public SendRequest(ulong producerId, ulong sequenceId)
         {
-            ProducerId = producerId;
-            SequenceId = sequenceId;
+            _producerId = producerId;
+            _sequenceId = sequenceId;
         }
 
+        public bool SenderIsConsumer(ulong consumerId)
+            => false;
+
+        public bool SenderIsProducer(ulong producerId)
+            => _producerId == producerId;
+
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
 #else
@@ -36,12 +42,12 @@ namespace DotPulsar.Internal.Requests
 #endif
         {
             if (other is SendRequest request)
-                return ProducerId.Equals(request.ProducerId) && 
SequenceId.Equals(request.SequenceId);
+                return _producerId.Equals(request._producerId) && 
_sequenceId.Equals(request._sequenceId);
 
             return false;
         }
 
         public override int GetHashCode()
-            => HashCode.Combine(ProducerId, SequenceId);
+            => HashCode.Combine(_producerId, _sequenceId);
     }
 }
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs 
b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 61b656e..666b227 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -20,10 +20,31 @@ namespace DotPulsar.Internal.Requests
 
     public struct StandardRequest : IRequest
     {
-        public ulong RequestId { get; }
+        private readonly ulong _requestId;
+        private readonly ulong? _consumerId;
+        private readonly ulong? _producerId;
 
-        public StandardRequest(ulong requestId)
-            => RequestId = requestId;
+        private StandardRequest(ulong requestId, ulong? consumerId, ulong? 
producerId)
+        {
+            _requestId = requestId;
+            _consumerId = consumerId;
+            _producerId = producerId;
+        }
+
+        public static StandardRequest WithRequestId(ulong requestId)
+            => new(requestId, null, null);
+
+        public static StandardRequest WithConsumerId(ulong requestId, ulong 
consumerId)
+            => new(requestId, consumerId, null);
+
+        public static StandardRequest WithProducerId(ulong requestId, ulong 
producerId)
+            => new (requestId, null, producerId);
+
+        public bool SenderIsConsumer(ulong consumerId)
+            => _consumerId.HasValue && _consumerId.Value == consumerId;
+
+        public bool SenderIsProducer(ulong producerId)
+            => _producerId.HasValue && _producerId.Value == producerId;
 
 #if NETSTANDARD2_0
         public bool Equals(IRequest other)
@@ -32,12 +53,12 @@ namespace DotPulsar.Internal.Requests
 #endif
         {
             if (other is StandardRequest request)
-                return RequestId.Equals(request.RequestId);
+                return _requestId.Equals(request._requestId);
 
             return false;
         }
 
         public override int GetHashCode()
-            => HashCode.Combine(RequestId);
+            => HashCode.Combine(_requestId);
     }
 }

Reply via email to