This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f141fa8 Guard Consumers/Readers and Producers from sending messages
that the broker will ignore if it has sent a CommandClose[Producer/Consumer]
f141fa8 is described below
commit f141fa82a633812d2cefeebee283036e9183f903
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Mon Mar 15 12:52:13 2021 +0100
Guard Consumers/Readers and Producers from sending messages that the broker
will ignore if it has sent a CommandClose[Producer/Consumer]
---
src/DotPulsar/Internal/Abstractions/IChannel.cs | 3 +
src/DotPulsar/Internal/Abstractions/IRequest.cs | 6 +-
src/DotPulsar/Internal/Channel.cs | 49 ++++++-
src/DotPulsar/Internal/ChannelManager.cs | 162 +++++++++++++++++----
src/DotPulsar/Internal/Connection.cs | 150 ++++++++++---------
src/DotPulsar/Internal/RequestResponseHandler.cs | 124 +++++++++++-----
src/DotPulsar/Internal/Requests/ConnectRequest.cs | 6 +
src/DotPulsar/Internal/Requests/SendRequest.cs | 18 ++-
src/DotPulsar/Internal/Requests/StandardRequest.cs | 31 +++-
9 files changed, 405 insertions(+), 144 deletions(-)
diff --git a/src/DotPulsar/Internal/Abstractions/IChannel.cs
b/src/DotPulsar/Internal/Abstractions/IChannel.cs
index 40f8af5..d9409dd 100644
--- a/src/DotPulsar/Internal/Abstractions/IChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IChannel.cs
@@ -14,6 +14,8 @@
namespace DotPulsar.Internal.Abstractions
{
+ using System;
+
public interface IChannel
{
void Received(MessagePackage message);
@@ -24,5 +26,6 @@ namespace DotPulsar.Internal.Abstractions
void Disconnected();
void ReachedEndOfTopic();
void Unsubscribed();
+ IDisposable SenderLock();
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs
b/src/DotPulsar/Internal/Abstractions/IRequest.cs
index f97314f..8ea7913 100644
--- a/src/DotPulsar/Internal/Abstractions/IRequest.cs
+++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs
@@ -16,5 +16,9 @@ namespace DotPulsar.Internal.Abstractions
{
using System;
- public interface IRequest : IEquatable<IRequest> { }
+ public interface IRequest : IEquatable<IRequest>
+ {
+ bool SenderIsProducer(ulong producerId);
+ bool SenderIsConsumer(ulong consumerId);
+ }
}
diff --git a/src/DotPulsar/Internal/Channel.cs
b/src/DotPulsar/Internal/Channel.cs
index 8d71572..b441680 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -17,15 +17,18 @@ namespace DotPulsar.Internal
using Abstractions;
using Events;
using System;
+ using System.Threading;
public sealed class Channel : IChannel
{
+ private readonly Lock _senderLock;
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private readonly IEnqueue<MessagePackage> _enqueue;
public Channel(Guid correlationId, IRegisterEvent eventRegister,
IEnqueue<MessagePackage> enqueue)
{
+ _senderLock = new Lock();
_correlationId = correlationId;
_eventRegister = eventRegister;
_enqueue = enqueue;
@@ -38,7 +41,10 @@ namespace DotPulsar.Internal
=> _eventRegister.Register(new ChannelActivated(_correlationId));
public void ClosedByServer()
- => _eventRegister.Register(new
ChannelClosedByServer(_correlationId));
+ {
+ _senderLock.Disable();
+ _eventRegister.Register(new ChannelClosedByServer(_correlationId));
+ }
public void Connected()
=> _eventRegister.Register(new ChannelConnected(_correlationId));
@@ -47,12 +53,51 @@ namespace DotPulsar.Internal
=> _eventRegister.Register(new ChannelDeactivated(_correlationId));
public void Disconnected()
- => _eventRegister.Register(new
ChannelDisconnected(_correlationId));
+ {
+ _senderLock.Disable();
+ _eventRegister.Register(new ChannelDisconnected(_correlationId));
+ }
public void ReachedEndOfTopic()
=> _eventRegister.Register(new
ChannelReachedEndOfTopic(_correlationId));
public void Unsubscribed()
=> _eventRegister.Register(new
ChannelUnsubscribed(_correlationId));
+
+ public IDisposable SenderLock()
+ => _senderLock.Enter();
+
+ private sealed class Lock : IDisposable
+ {
+ private readonly object _lock;
+ private bool _canSend;
+
+ public Lock()
+ {
+ _lock = new object();
+ _canSend = true;
+ }
+
+ public void Disable()
+ {
+ Monitor.Enter(_lock);
+ _canSend = false;
+ Monitor.Exit(_lock);
+ }
+
+ public IDisposable Enter()
+ {
+ Monitor.Enter(_lock);
+
+ if (_canSend)
+ return this;
+
+ Monitor.Exit(_lock);
+ throw new OperationCanceledException();
+ }
+
+ public void Dispose()
+ => Monitor.Exit(_lock);
+ }
}
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs
b/src/DotPulsar/Internal/ChannelManager.cs
index d2eda4d..ed3fa51 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -23,29 +23,37 @@ namespace DotPulsar.Internal
public sealed class ChannelManager : IDisposable
{
+ private readonly RequestResponseHandler _requestResponseHandler;
private readonly IdLookup<IChannel> _consumerChannels;
private readonly IdLookup<IChannel> _producerChannels;
+ private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>>
_incoming;
public ChannelManager()
{
+ _requestResponseHandler = new RequestResponseHandler();
_consumerChannels = new IdLookup<IChannel>();
_producerChannels = new IdLookup<IChannel>();
+ _incoming = new EnumLookup<BaseCommand.Type,
Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+ _incoming.Set(BaseCommand.Type.CloseConsumer, cmd =>
Incoming(cmd.CloseConsumer));
+ _incoming.Set(BaseCommand.Type.CloseProducer, cmd =>
Incoming(cmd.CloseProducer));
+ _incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd =>
Incoming(cmd.ActiveConsumerChange));
+ _incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd =>
Incoming(cmd.ReachedEndOfTopic));
}
public bool HasChannels()
=> !_consumerChannels.IsEmpty() || !_producerChannels.IsEmpty();
- public Task<ProducerResponse> Outgoing(CommandProducer command,
Task<BaseCommand> response, IChannel channel)
+ public Task<ProducerResponse> Outgoing(CommandProducer command,
IChannel channel)
{
var producerId = _producerChannels.Add(channel);
-
command.ProducerId = producerId;
+ var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
- _producerChannels.Remove(producerId);
+ _ = _producerChannels.Remove(producerId);
result.Result.Error.Throw();
}
@@ -55,65 +63,164 @@ namespace DotPulsar.Internal
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
- public Task<SubscribeResponse> Outgoing(CommandSubscribe command,
Task<BaseCommand> response, IChannel channel)
+ public Task<SubscribeResponse> Outgoing(CommandSubscribe command,
IChannel channel)
{
var consumerId = _consumerChannels.Add(channel);
-
command.ConsumerId = consumerId;
+ var response = _requestResponseHandler.Outgoing(command);
return response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Error)
{
- _consumerChannels.Remove(consumerId);
+ _ = _consumerChannels.Remove(consumerId);
result.Result.Error.Throw();
}
channel.Connected();
+
return new SubscribeResponse(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
}
- public void Outgoing(CommandCloseConsumer command, Task<BaseCommand>
response)
+ public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
{
var consumerId = command.ConsumerId;
+ Task<BaseCommand> response;
+
+ using (TakeConsumerSenderLock(consumerId))
+ {
+ response = _requestResponseHandler.Outgoing(command);
+ }
+
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _consumerChannels.Remove(consumerId);
+ _ = _consumerChannels.Remove(consumerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+ return response;
}
- public void Outgoing(CommandCloseProducer command, Task<BaseCommand>
response)
+ public Task<BaseCommand> Outgoing(CommandCloseProducer command)
{
var producerId = command.ProducerId;
+ Task<BaseCommand> response;
+
+ using (TakeProducerSenderLock(producerId))
+ {
+ response = _requestResponseHandler.Outgoing(command);
+ }
+
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _producerChannels.Remove(producerId);
+ _ = _producerChannels.Remove(producerId);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+ return response;
}
- public void Outgoing(CommandUnsubscribe command, Task<BaseCommand>
response)
+ public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
{
var consumerId = command.ConsumerId;
+ Task<BaseCommand> response;
+
+ using (TakeConsumerSenderLock(consumerId))
+ {
+ response = _requestResponseHandler.Outgoing(command);
+ }
+
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _consumerChannels.Remove(consumerId)?.Unsubscribed();
+ _consumerChannels.Remove(consumerId)?.Unsubscribed();
}, TaskContinuationOptions.OnlyOnRanToCompletion);
+
+ return response;
+ }
+
+ public Task<BaseCommand> Outgoing(CommandSend command)
+ {
+ using (TakeProducerSenderLock(command.ProducerId))
+ {
+ return _requestResponseHandler.Outgoing(command);
+ }
+ }
+
+ public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
+ => _requestResponseHandler.Outgoing(command);
+
+ public Task<BaseCommand> Outgoing(CommandConnect command)
+ => _requestResponseHandler.Outgoing(command);
+
+ public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+ => _requestResponseHandler.Outgoing(command);
+
+ public Task<BaseCommand> Outgoing(CommandSeek command)
+ {
+ using (TakeConsumerSenderLock(command.ConsumerId))
+ {
+ return _requestResponseHandler.Outgoing(command);
+ }
+ }
+
+ public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+ {
+ using (TakeConsumerSenderLock(command.ConsumerId))
+ {
+ return _requestResponseHandler.Outgoing(command);
+ }
}
- public void Incoming(CommandCloseConsumer command)
- => _consumerChannels.Remove(command.ConsumerId)?.ClosedByServer();
+ public void Incoming(BaseCommand command)
+ => _incoming.Get(command.CommandType)(command);
- public void Incoming(CommandCloseProducer command)
- => _producerChannels.Remove(command.ProducerId)?.ClosedByServer();
+ public void Incoming(CommandMessage command, ReadOnlySequence<byte>
data)
+ => _consumerChannels[command.ConsumerId]?.Received(new
MessagePackage(command.MessageId, command.RedeliveryCount, data));
- public void Incoming(CommandActiveConsumerChange command)
+ public void Dispose()
+ {
+ _requestResponseHandler.Dispose();
+
+ foreach (var channel in _consumerChannels.RemoveAll())
+ channel.Disconnected();
+
+ foreach (var channel in _producerChannels.RemoveAll())
+ channel.Disconnected();
+ }
+
+ private void Incoming(CommandReachedEndOfTopic command)
+ => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
+
+ private void Incoming(CommandCloseConsumer command)
+ {
+ var channel = _consumerChannels[command.ConsumerId];
+
+ if (channel is null)
+ return;
+
+ _ = _consumerChannels.Remove(command.ConsumerId);
+ _requestResponseHandler.Incoming(command);
+ channel.ClosedByServer();
+ }
+
+ private void Incoming(CommandCloseProducer command)
+ {
+ var channel = _producerChannels[command.ProducerId];
+
+ if (channel is null)
+ return;
+
+ _ = _producerChannels.Remove(command.ProducerId);
+ _requestResponseHandler.Incoming(command);
+ channel.ClosedByServer();
+ }
+
+ private void Incoming(CommandActiveConsumerChange command)
{
var channel = _consumerChannels[command.ConsumerId];
@@ -126,19 +233,22 @@ namespace DotPulsar.Internal
channel.Deactivated();
}
- public void Incoming(CommandReachedEndOfTopic command)
- => _consumerChannels[command.ConsumerId]?.ReachedEndOfTopic();
+ private IDisposable TakeConsumerSenderLock(ulong consumerId)
+ {
+ var channel = _consumerChannels[consumerId];
+ if (channel is null)
+ throw new OperationCanceledException();
- public void Incoming(CommandMessage command, ReadOnlySequence<byte>
data)
- => _consumerChannels[command.ConsumerId]?.Received(new
MessagePackage(command.MessageId, command.RedeliveryCount, data));
+ return channel.SenderLock();
+ }
- public void Dispose()
+ private IDisposable TakeProducerSenderLock(ulong producerId)
{
- foreach (var channel in _consumerChannels.RemoveAll())
- channel.Disconnected();
+ var channel = _producerChannels[producerId];
+ if (channel is null)
+ throw new OperationCanceledException();
- foreach (var channel in _producerChannels.RemoveAll())
- channel.Disconnected();
+ return channel.SenderLock();
}
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs
b/src/DotPulsar/Internal/Connection.cs
index 8b24df8..efc1b12 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@ namespace DotPulsar.Internal
using Exceptions;
using Extensions;
using PulsarApi;
- using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
@@ -27,7 +26,6 @@ namespace DotPulsar.Internal
{
private readonly AsyncLock _lock;
private readonly ChannelManager _channelManager;
- private readonly RequestResponseHandler _requestResponseHandler;
private readonly PingPongHandler _pingPongHandler;
private readonly IPulsarStream _stream;
private int _isDisposed;
@@ -36,7 +34,6 @@ namespace DotPulsar.Internal
{
_lock = new AsyncLock();
_channelManager = new ChannelManager();
- _requestResponseHandler = new RequestResponseHandler();
_pingPongHandler = new PingPongHandler(this);
_stream = stream;
}
@@ -59,10 +56,8 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- var requestResponseTask =
_requestResponseHandler.Outgoing(baseCommand);
- responseTask = _channelManager.Outgoing(command,
requestResponseTask, channel);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command, channel);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
@@ -77,10 +72,8 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- var requestResponseTask =
_requestResponseHandler.Outgoing(baseCommand);
- responseTask = _channelManager.Outgoing(command,
requestResponseTask, channel);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command, channel);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
@@ -110,29 +103,31 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- responseTask = _requestResponseHandler.Outgoing(baseCommand);
- _channelManager.Outgoing(command, responseTask);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
return await responseTask.ConfigureAwait(false);
}
- public Task<BaseCommand> Send(CommandConnect command,
CancellationToken cancellationToken)
- => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ public async Task<BaseCommand> Send(CommandConnect command,
CancellationToken cancellationToken)
+ {
+ ThrowIfDisposed();
- public Task<BaseCommand> Send(CommandLookupTopic command,
CancellationToken cancellationToken)
- => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ Task<BaseCommand>? responseTask;
- public Task<BaseCommand> Send(CommandSeek command, CancellationToken
cancellationToken)
- => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
- public Task<BaseCommand> Send(CommandGetLastMessageId command,
CancellationToken cancellationToken)
- => SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+ return await responseTask.ConfigureAwait(false);
+ }
- public async Task<BaseCommand> Send(CommandCloseProducer command,
CancellationToken cancellationToken)
+ public async Task<BaseCommand> Send(CommandLookupTopic command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -140,17 +135,15 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- responseTask = _requestResponseHandler.Outgoing(baseCommand);
- _channelManager.Outgoing(command, responseTask);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
return await responseTask.ConfigureAwait(false);
}
- public async Task<BaseCommand> Send(CommandCloseConsumer command,
CancellationToken cancellationToken)
+ public async Task<BaseCommand> Send(CommandSeek command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -158,34 +151,31 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- responseTask = _requestResponseHandler.Outgoing(baseCommand);
- _channelManager.Outgoing(command, responseTask);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
return await responseTask.ConfigureAwait(false);
}
- public async Task<BaseCommand> Send(SendPackage command,
CancellationToken cancellationToken)
+ public async Task<BaseCommand> Send(CommandGetLastMessageId command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
- Task<BaseCommand>? response;
+ Task<BaseCommand>? responseTask;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.Command!.AsBaseCommand();
- response = _requestResponseHandler.Outgoing(baseCommand);
- var sequence = Serializer.Serialize(baseCommand,
command.Metadata!, command.Payload);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
- return await response.ConfigureAwait(false);
+ return await responseTask.ConfigureAwait(false);
}
- public async Task<BaseCommand> Send(CommandGetOrCreateSchema command,
CancellationToken cancellationToken)
+ public async Task<BaseCommand> Send(CommandCloseProducer command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -193,58 +183,77 @@ namespace DotPulsar.Internal
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.AsBaseCommand();
- responseTask = _requestResponseHandler.Outgoing(baseCommand);
- var sequence = Serializer.Serialize(baseCommand);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
return await responseTask.ConfigureAwait(false);
}
- private async Task<BaseCommand> SendRequestResponse(BaseCommand
command, CancellationToken cancellationToken)
+ public async Task<BaseCommand> Send(CommandCloseConsumer command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
- Task<BaseCommand>? response;
+ Task<BaseCommand>? responseTask;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- response = _requestResponseHandler.Outgoing(command);
- var sequence = Serializer.Serialize(command);
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
await _stream.Send(sequence).ConfigureAwait(false);
}
- return await response.ConfigureAwait(false);
+ return await responseTask.ConfigureAwait(false);
}
- private async Task Send(BaseCommand command, CancellationToken
cancellationToken)
+ public async Task<BaseCommand> Send(SendPackage command,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
+ Task<BaseCommand>? responseTask;
+
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var sequence = Serializer.Serialize(command);
+ responseTask = _channelManager.Outgoing(command.Command!);
+ var sequence =
Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!,
command.Payload);
await _stream.Send(sequence).ConfigureAwait(false);
}
+
+ return await responseTask.ConfigureAwait(false);
}
- public async Task ProcessIncommingFrames()
+ public async Task<BaseCommand> Send(CommandGetOrCreateSchema command,
CancellationToken cancellationToken)
{
- await Task.Yield();
+ ThrowIfDisposed();
- var lookup = new EnumLookup<BaseCommand.Type,
Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
+ Task<BaseCommand>? responseTask;
- lookup.Set(BaseCommand.Type.CloseConsumer, cmd =>
_channelManager.Incoming(cmd.CloseConsumer));
- lookup.Set(BaseCommand.Type.ActiveConsumerChange, cmd =>
_channelManager.Incoming(cmd.ActiveConsumerChange));
- lookup.Set(BaseCommand.Type.ReachedEndOfTopic, cmd =>
_channelManager.Incoming(cmd.ReachedEndOfTopic));
- lookup.Set(BaseCommand.Type.Ping, cmd =>
_pingPongHandler.GotPing());
- lookup.Set(BaseCommand.Type.CloseProducer, cmd =>
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- _channelManager.Incoming(cmd.CloseProducer);
- _requestResponseHandler.Incoming(cmd.CloseProducer);
- });
-
+ responseTask = _channelManager.Outgoing(command);
+ var sequence = Serializer.Serialize(command.AsBaseCommand());
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
+
+ return await responseTask.ConfigureAwait(false);
+ }
+
+ private async Task Send(BaseCommand command, CancellationToken
cancellationToken)
+ {
+ ThrowIfDisposed();
+
+ var sequence = Serializer.Serialize(command);
+
+ using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
+ {
+ await _stream.Send(sequence).ConfigureAwait(false);
+ }
+ }
+
+ public async Task ProcessIncommingFrames()
+ {
+ await Task.Yield();
try
{
@@ -253,10 +262,18 @@ namespace DotPulsar.Internal
var commandSize = frame.ReadUInt32(0, true);
var command =
Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
- if (command.CommandType == BaseCommand.Type.Message)
- _channelManager.Incoming(command.Message, new
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
- else
- lookup.Get(command.CommandType)(command);
+ switch (command.CommandType)
+ {
+ case BaseCommand.Type.Message:
+ _channelManager.Incoming(command.Message, new
ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
+ break;
+ case BaseCommand.Type.Ping:
+ _pingPongHandler.GotPing();
+ break;
+ default:
+ _channelManager.Incoming(command);
+ break;
+ }
}
}
catch
@@ -271,7 +288,6 @@ namespace DotPulsar.Internal
return;
await _lock.DisposeAsync().ConfigureAwait(false);
- _requestResponseHandler.Dispose();
_channelManager.Dispose();
await _stream.DisposeAsync().ConfigureAwait(false);
}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs
b/src/DotPulsar/Internal/RequestResponseHandler.cs
index b3b7d0a..5cbd632 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -24,57 +24,97 @@ namespace DotPulsar.Internal
{
private readonly RequestId _requestId;
private readonly Awaiter<IRequest, BaseCommand> _requests;
- private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>>
_setRequestId;
private readonly EnumLookup<BaseCommand.Type, Func<BaseCommand,
IRequest>> _getResponseIdentifier;
public RequestResponseHandler()
{
_requestId = new RequestId();
-
_requests = new Awaiter<IRequest, BaseCommand>();
- _setRequestId = new EnumLookup<BaseCommand.Type,
Action<BaseCommand>>(cmd => { });
- _setRequestId.Set(BaseCommand.Type.Seek, cmd => cmd.Seek.RequestId
= _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.Error, cmd =>
cmd.Error.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.Producer, cmd =>
cmd.Producer.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.Lookup, cmd =>
cmd.LookupTopic.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.Subscribe, cmd =>
cmd.Subscribe.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.Unsubscribe, cmd =>
cmd.Unsubscribe.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.CloseConsumer, cmd =>
cmd.CloseConsumer.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.CloseProducer, cmd =>
cmd.CloseProducer.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.GetLastMessageId, cmd =>
cmd.GetLastMessageId.RequestId = _requestId.FetchNext());
- _setRequestId.Set(BaseCommand.Type.GetOrCreateSchema, cmd =>
cmd.GetOrCreateSchema.RequestId = _requestId.FetchNext());
-
- _getResponseIdentifier = new EnumLookup<BaseCommand.Type,
Func<BaseCommand, IRequest>>(cmd => throw new
ArgumentOutOfRangeException(nameof(cmd.CommandType), cmd.CommandType,
"CommandType not supported as request/response type"));
- _getResponseIdentifier.Set(BaseCommand.Type.Connect, cmd => new
ConnectRequest());
+ _getResponseIdentifier = new EnumLookup<BaseCommand.Type,
Func<BaseCommand, IRequest>>(cmd => throw new Exception($"CommandType
'{cmd.CommandType}' not supported as request/response type"));
_getResponseIdentifier.Set(BaseCommand.Type.Connected, cmd => new
ConnectRequest());
- _getResponseIdentifier.Set(BaseCommand.Type.Seek, cmd => new
StandardRequest(cmd.Seek.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Send, cmd => new
SendRequest(cmd.Send.ProducerId, cmd.Send.SequenceId));
_getResponseIdentifier.Set(BaseCommand.Type.SendError, cmd => new
SendRequest(cmd.SendError.ProducerId, cmd.SendError.SequenceId));
_getResponseIdentifier.Set(BaseCommand.Type.SendReceipt, cmd =>
new SendRequest(cmd.SendReceipt.ProducerId, cmd.SendReceipt.SequenceId));
- _getResponseIdentifier.Set(BaseCommand.Type.Producer, cmd => new
StandardRequest(cmd.Producer.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd
=> new StandardRequest(cmd.ProducerSuccess.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd =>
new StandardRequest(cmd.CloseConsumer.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd =>
new StandardRequest(cmd.CloseProducer.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Lookup, cmd => new
StandardRequest(cmd.LookupTopic.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd =>
new StandardRequest(cmd.LookupTopicResponse.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Subscribe, cmd => new
StandardRequest(cmd.Subscribe.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Unsubscribe, cmd =>
new StandardRequest(cmd.Unsubscribe.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageId, cmd
=> new StandardRequest(cmd.GetLastMessageId.RequestId));
-
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd =>
new StandardRequest(cmd.GetLastMessageIdResponse.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchema, cmd
=> new StandardRequest(cmd.GetOrCreateSchema.RequestId));
-
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd =>
new StandardRequest(cmd.GetOrCreateSchemaResponse.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd => new
StandardRequest(cmd.Success.RequestId));
- _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd =>
!_requestId.IsPastInitialId() ? new ConnectRequest() : new
StandardRequest(cmd.Error.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.ProducerSuccess, cmd
=> StandardRequest.WithRequestId(cmd.ProducerSuccess.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.CloseConsumer, cmd =>
StandardRequest.WithConsumerId(cmd.CloseConsumer.RequestId,
cmd.CloseConsumer.ConsumerId));
+ _getResponseIdentifier.Set(BaseCommand.Type.CloseProducer, cmd =>
StandardRequest.WithProducerId(cmd.CloseProducer.RequestId,
cmd.CloseProducer.ProducerId));
+ _getResponseIdentifier.Set(BaseCommand.Type.LookupResponse, cmd =>
StandardRequest.WithRequestId(cmd.LookupTopicResponse.RequestId));
+
_getResponseIdentifier.Set(BaseCommand.Type.GetLastMessageIdResponse, cmd =>
StandardRequest.WithRequestId(cmd.GetLastMessageIdResponse.RequestId));
+
_getResponseIdentifier.Set(BaseCommand.Type.GetOrCreateSchemaResponse, cmd =>
StandardRequest.WithRequestId(cmd.GetOrCreateSchemaResponse.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Success, cmd =>
StandardRequest.WithRequestId(cmd.Success.RequestId));
+ _getResponseIdentifier.Set(BaseCommand.Type.Error, cmd =>
!_requestId.IsPastInitialId() ? new ConnectRequest() :
StandardRequest.WithRequestId(cmd.Error.RequestId));
}
public void Dispose()
- => _requests.Dispose();
+ => _requests.Dispose();
+
+ public Task<BaseCommand> Outgoing(CommandProducer command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithProducerId(command.RequestId,
command.ProducerId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandCloseProducer command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithProducerId(command.RequestId,
command.ProducerId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandSubscribe command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithConsumerId(command.RequestId,
command.ConsumerId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithConsumerId(command.RequestId,
command.ConsumerId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithConsumerId(command.RequestId,
command.ConsumerId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandSend command)
+ {
+ var request = new SendRequest(command.ProducerId,
command.SequenceId);
+ return _requests.CreateTask(request);
+ }
- public Task<BaseCommand> Outgoing(BaseCommand command)
+ public Task<BaseCommand> Outgoing(CommandGetOrCreateSchema command)
{
- _setRequestId.Get(command.CommandType)(command);
- return
_requests.CreateTask(_getResponseIdentifier.Get(command.CommandType)(command));
+ command.RequestId = _requestId.FetchNext();
+ var request = StandardRequest.WithRequestId(command.RequestId);
+ return _requests.CreateTask(request);
+ }
+
+ public Task<BaseCommand> Outgoing(CommandConnect _1)
+ => _requests.CreateTask(new ConnectRequest());
+
+ public Task<BaseCommand> Outgoing(CommandLookupTopic command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ return
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
+ }
+
+ public Task<BaseCommand> Outgoing(CommandSeek command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ return
_requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId,
command.ConsumerId));
+ }
+
+ public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
+ {
+ command.RequestId = _requestId.FetchNext();
+ return
_requests.CreateTask(StandardRequest.WithRequestId(command.RequestId));
}
public void Incoming(BaseCommand command)
@@ -85,12 +125,22 @@ namespace DotPulsar.Internal
_requests.SetResult(identifier, command);
}
+ public void Incoming(CommandCloseConsumer command)
+ {
+ var requests = _requests.Keys;
+ foreach (var request in requests)
+ {
+ if (request.SenderIsConsumer(command.ConsumerId))
+ _requests.Cancel(request);
+ }
+ }
+
public void Incoming(CommandCloseProducer command)
{
var requests = _requests.Keys;
foreach (var request in requests)
{
- if (request is SendRequest sendRequest &&
sendRequest.ProducerId == command.ProducerId)
+ if (request.SenderIsProducer(command.ProducerId))
_requests.Cancel(request);
}
}
diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
index ceca386..5ab0e20 100644
--- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs
+++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs
@@ -19,6 +19,12 @@ namespace DotPulsar.Internal.Requests
public struct ConnectRequest : IRequest
{
+ public bool SenderIsConsumer(ulong consumerId)
+ => false;
+
+ public bool SenderIsProducer(ulong producerId)
+ => false;
+
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs
b/src/DotPulsar/Internal/Requests/SendRequest.cs
index 5f15fd2..36498b6 100644
--- a/src/DotPulsar/Internal/Requests/SendRequest.cs
+++ b/src/DotPulsar/Internal/Requests/SendRequest.cs
@@ -20,15 +20,21 @@ namespace DotPulsar.Internal.Requests
public struct SendRequest : IRequest
{
- public ulong ProducerId { get; }
- public ulong SequenceId { get; }
+ private readonly ulong _producerId;
+ private readonly ulong _sequenceId;
public SendRequest(ulong producerId, ulong sequenceId)
{
- ProducerId = producerId;
- SequenceId = sequenceId;
+ _producerId = producerId;
+ _sequenceId = sequenceId;
}
+ public bool SenderIsConsumer(ulong consumerId)
+ => false;
+
+ public bool SenderIsProducer(ulong producerId)
+ => _producerId == producerId;
+
#if NETSTANDARD2_0
public bool Equals(IRequest other)
#else
@@ -36,12 +42,12 @@ namespace DotPulsar.Internal.Requests
#endif
{
if (other is SendRequest request)
- return ProducerId.Equals(request.ProducerId) &&
SequenceId.Equals(request.SequenceId);
+ return _producerId.Equals(request._producerId) &&
_sequenceId.Equals(request._sequenceId);
return false;
}
public override int GetHashCode()
- => HashCode.Combine(ProducerId, SequenceId);
+ => HashCode.Combine(_producerId, _sequenceId);
}
}
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs
b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 61b656e..666b227 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -20,10 +20,31 @@ namespace DotPulsar.Internal.Requests
public struct StandardRequest : IRequest
{
- public ulong RequestId { get; }
+ private readonly ulong _requestId;
+ private readonly ulong? _consumerId;
+ private readonly ulong? _producerId;
- public StandardRequest(ulong requestId)
- => RequestId = requestId;
+ private StandardRequest(ulong requestId, ulong? consumerId, ulong?
producerId)
+ {
+ _requestId = requestId;
+ _consumerId = consumerId;
+ _producerId = producerId;
+ }
+
+ public static StandardRequest WithRequestId(ulong requestId)
+ => new(requestId, null, null);
+
+ public static StandardRequest WithConsumerId(ulong requestId, ulong
consumerId)
+ => new(requestId, consumerId, null);
+
+ public static StandardRequest WithProducerId(ulong requestId, ulong
producerId)
+ => new (requestId, null, producerId);
+
+ public bool SenderIsConsumer(ulong consumerId)
+ => _consumerId.HasValue && _consumerId.Value == consumerId;
+
+ public bool SenderIsProducer(ulong producerId)
+ => _producerId.HasValue && _producerId.Value == producerId;
#if NETSTANDARD2_0
public bool Equals(IRequest other)
@@ -32,12 +53,12 @@ namespace DotPulsar.Internal.Requests
#endif
{
if (other is StandardRequest request)
- return RequestId.Equals(request.RequestId);
+ return _requestId.Equals(request._requestId);
return false;
}
public override int GetHashCode()
- => HashCode.Combine(RequestId);
+ => HashCode.Combine(_requestId);
}
}