This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65cb69f Partitioned Topic Consumer Support (#146)
65cb69f is described below
commit 65cb69f2f9c33bcd46d76db5738d57f8dd8046df
Author: Thomas O'Neill <[email protected]>
AuthorDate: Fri May 19 06:40:54 2023 -0400
Partitioned Topic Consumer Support (#146)
* feat(consumer): Add support for consuming partitioned topics
* chore(deps): Add Nito.AsyncEx package reference
* chore: Fix documentation, add new line to end of file
---------
Co-authored-by: Thomas O'Neill <[email protected]>
---
CHANGELOG.md | 4 +
samples/Consuming/Program.cs | 5 +-
src/DotPulsar/ConsumerState.cs | 7 +-
src/DotPulsar/DotPulsar.csproj | 1 +
.../Exceptions/ConsumerNotActiveException.cs | 21 +
src/DotPulsar/Internal/Consumer.cs | 495 ++++++++++++++++-----
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 1 +
.../Internal/{Consumer.cs => SubConsumer.cs} | 187 ++++----
src/DotPulsar/PulsarClient.cs | 33 +-
tests/DotPulsar.Tests/ConsumerTests.cs | 26 ++
10 files changed, 541 insertions(+), 239 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cfd3fab..f701407 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [UnReleased]
+
+- Support for consuming partitioned topics
+
## [2.12.0] - ?
### Added
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index dd71b35..9cbddbd 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -52,8 +52,9 @@ internal static class Program
{
await foreach (var message in consumer.Messages(cancellationToken))
{
- Console.WriteLine($"Received: {message.Value()}");
- await consumer.Acknowledge(message, cancellationToken);
+ Console.WriteLine($"Received: {message?.Value()}");
+ if (message is not null)
+ await consumer.Acknowledge(message, cancellationToken);
}
}
catch (OperationCanceledException) { }
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
index d251e22..7dfd9ed 100644
--- a/src/DotPulsar/ConsumerState.cs
+++ b/src/DotPulsar/ConsumerState.cs
@@ -52,5 +52,10 @@ public enum ConsumerState : byte
/// <summary>
/// The consumer has unsubscribed. This is a final state.
/// </summary>
- Unsubscribed
+ Unsubscribed,
+
+ /// <summary>
+ /// When the topic is a partition topic and some of the SubConsumers are
active.
+ /// </summary>
+ PartiallyActive
}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index fb5b330..bcb70a1 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -25,6 +25,7 @@
<PackageReference Include="HashDepot" Version="2.0.3" />
<PackageReference Include="Microsoft.Extensions.ObjectPool"
Version="7.0.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1"
PrivateAssets="All" />
+ <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
<PackageReference Include="protobuf-net" Version="3.2.16" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
new file mode 100644
index 0000000..7a2c1db
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ public sealed class ConsumerNotActiveException : DotPulsarException
+ {
+ public ConsumerNotActiveException(string message) : base(message) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 9cb3327..7558a13 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,27 +16,35 @@ namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
+using DotPulsar.Internal.Compression;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
-using Microsoft.Extensions.ObjectPool;
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+using Nito.AsyncEx;
-public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
+public sealed class Consumer<TMessage> : IConsumer<TMessage>, IRegisterEvent
{
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel<TMessage> _channel;
- private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
- private readonly IStateChanged<ConsumerState> _state;
- private readonly IConsumerChannelFactory<TMessage> _factory;
+ private readonly StateManager<ConsumerState> _state;
+ private readonly ConsumerOptions<TMessage> _options;
+ private readonly ProcessManager _processManager;
+ private readonly IHandleException _exceptionHandler;
+ private readonly IConnectionPool _connectionPool;
+ private readonly CancellationTokenSource _cts;
+ private readonly ConcurrentDictionary<string, IConsumer<TMessage>>
_consumers;
+ private readonly AsyncReaderWriterLock _lock;
+ private ConcurrentQueue<IMessage<TMessage>> _messagesQueue;
private int _isDisposed;
+ private int _consumerCount;
+ private uint _numberOfPartitions;
+ private bool _isPartitioned;
private Exception? _faultException;
public Uri ServiceUrl { get; }
@@ -44,29 +52,31 @@ public sealed class Consumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
public string Topic { get; }
public Consumer(
- Guid correlationId,
Uri serviceUrl,
- string subscriptionName,
- string topic,
- IRegisterEvent eventRegister,
- IConsumerChannel<TMessage> initialChannel,
- IExecute executor,
- IStateChanged<ConsumerState> state,
- IConsumerChannelFactory<TMessage> factory)
- {
- _correlationId = correlationId;
+ ConsumerOptions<TMessage> options,
+ ProcessManager processManager,
+ IHandleException exceptionHandler,
+ IConnectionPool connectionPool)
+ {
+ Topic = options.Topic;
ServiceUrl = serviceUrl;
- SubscriptionName = subscriptionName;
- Topic = topic;
- _eventRegister = eventRegister;
- _channel = initialChannel;
- _executor = executor;
- _state = state;
- _factory = factory;
- _commandAckPool = new DefaultObjectPool<CommandAck>(new
DefaultPooledObjectPolicy<CommandAck>());
+ SubscriptionName = options.SubscriptionName;
+
+ _options = options;
+ _processManager = processManager;
+ _exceptionHandler = exceptionHandler;
+ _connectionPool = connectionPool;
+
+ _state = new StateManager<ConsumerState>(ConsumerState.Disconnected,
ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+ _cts = new CancellationTokenSource();
+ _executor = new Executor(Guid.Empty, this, _exceptionHandler);
+ _consumers = new ConcurrentDictionary<string, IConsumer<TMessage>>();
+ _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
_isDisposed = 0;
- _eventRegister.Register(new ConsumerCreated(_correlationId));
+ _lock = new AsyncReaderWriterLock();
+
+ _ = Setup();
}
public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken)
@@ -86,140 +96,407 @@ public sealed class Consumer<TMessage> :
IContainsChannel, IConsumer<TMessage>
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ConsumerDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
+ _cts.Cancel();
+ _cts.Dispose();
+
+ _state.SetState(ConsumerState.Closed);
+
+ using (_lock.ReaderLock())
+ {
+ foreach (var consumer in _consumers.Values)
+ {
+ await consumer.DisposeAsync().ConfigureAwait(false);
+ }
+ }
}
- private async ValueTask DisposeChannel()
+ private async Task Setup()
{
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
- }
+ await Task.Yield();
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
+ try
+ {
+ await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (_cts.IsCancellationRequested)
+ return;
- public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
+ _faultException = exception;
+ _state.SetState(ConsumerState.Faulted);
+ }
+ }
+ private async Task Monitor()
+ {
+ _numberOfPartitions = await GetNumberOfPartitions(Topic,
_cts.Token).ConfigureAwait(false);
+ _isPartitioned = _numberOfPartitions != 0;
+ var monitoringTasks = new List<Task<ConsumerStateChanged>>();
- public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
+ using (_lock.ReaderLock())
+ {
+ if (_isPartitioned)
+ {
+
+ for (var partition = 0; partition < _numberOfPartitions;
++partition)
+ {
+ var partitionedTopicName =
getPartitonedTopicName(partition);
+
+ var consumer = CreateSubConsumer(partitionedTopicName);
+ _ = _consumers.TryAdd(partitionedTopicName, consumer);
+
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected,
_cts.Token).AsTask());
+ }
+ }
+
+ else
+ {
+ var consumer = CreateSubConsumer(Topic);
+ _ = _consumers.TryAdd(Topic, consumer);
+
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected,
_cts.Token).AsTask());
+ }
+
+ Interlocked.Exchange(ref _consumerCount, monitoringTasks.Count);
+ }
+ var activeConsumers = 0;
+ while (true)
+ {
+ await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+ for (var i = 0; i < monitoringTasks.Count; ++i)
+ {
+ var task = monitoringTasks[i];
+ if (!task.IsCompleted)
+ continue;
+ var state = task.Result.ConsumerState;
+ switch (state)
+ {
+ case ConsumerState.Active:
+ ++activeConsumers;
+ break;
+ case ConsumerState.Disconnected:
+ --activeConsumers;
+ break;
+ case ConsumerState.ReachedEndOfTopic:
+ _state.SetState(ConsumerState.ReachedEndOfTopic);
+ return;
+ case ConsumerState.Faulted:
+ _state.SetState(ConsumerState.Faulted);
+ return;
+ case ConsumerState.Unsubscribed:
+ _state.SetState(ConsumerState.Unsubscribed);
+ return;
+ }
+
+ monitoringTasks[i] =
task.Result.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
+ }
+
+ if (activeConsumers == 0)
+ _state.SetState(ConsumerState.Disconnected);
+ else if (activeConsumers == monitoringTasks.Count)
+ _state.SetState(ConsumerState.Active);
+ else
+ _state.SetState(ConsumerState.PartiallyActive);
+ }
+ }
- public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
+ private SubConsumer<TMessage> CreateSubConsumer(string topic)
{
- var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
- await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
+ var correlationId = Guid.NewGuid();
+ var consumerName = _options.ConsumerName ??
$"Consumer-{correlationId:N}";
- public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
- => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
+ var subscribe = new CommandSubscribe
+ {
+ ConsumerName = consumerName,
+ InitialPosition = (CommandSubscribe.InitialPositionType)
_options.InitialPosition,
+ PriorityLevel = _options.PriorityLevel,
+ ReadCompacted = _options.ReadCompacted,
+ ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
+ Subscription = _options.SubscriptionName,
+ Topic = topic,
+ Type = (CommandSubscribe.SubType) _options.SubscriptionType
+ };
+
+ foreach (var property in _options.SubscriptionProperties)
+ {
+ var keyValue = new KeyValue { Key = property.Key, Value =
property.Value };
+ subscribe.SubscriptionProperties.Add(keyValue);
+ }
- public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+ var messagePrefetchCount = _options.MessagePrefetchCount;
+ var messageFactory = new MessageFactory<TMessage>(_options.Schema);
+ var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+ var decompressorFactories =
CompressionFactories.DecompressorFactories();
+
+ var factory = new ConsumerChannelFactory<TMessage>(correlationId,
_processManager, _connectionPool, subscribe, messagePrefetchCount,
batchHandler, messageFactory, decompressorFactories);
+ var stateManager = new
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
+ var consumer = new SubConsumer<TMessage>(correlationId, ServiceUrl,
_options.SubscriptionName, topic, _processManager, initialChannel, executor,
stateManager, factory);
+ var process = new ConsumerProcess(correlationId, stateManager,
consumer, _options.SubscriptionType == SubscriptionType.Failover);
+ _processManager.Add(process);
+ process.Start();
+ return consumer;
+ }
+
+ private async Task<uint> GetNumberOfPartitions(string topic,
CancellationToken cancellationToken)
{
- var unsubscribe = new CommandUnsubscribe();
- await _executor.Execute(() => InternalUnsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ var connection = await _connectionPool.FindConnectionForTopic(topic,
cancellationToken).ConfigureAwait(false);
+ var commandPartitionedMetadata = new
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
+ var response = await connection.Send(commandPartitionedMetadata,
cancellationToken).ConfigureAwait(false);
+
+
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
+
+ if (response.PartitionMetadataResponse.Response ==
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+ response.PartitionMetadataResponse.Throw();
+
+ return response.PartitionMetadataResponse.Partitions;
}
- public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed != 0)
+ throw new ConsumerDisposedException(GetType().FullName!);
+ }
+ public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken = default)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ var sourceTopic = Topic;
+ if (_isPartitioned)
+ {
+ sourceTopic = getPartitonedTopicName(messageId.Partition);
+ }
+ await _executor.Execute(() =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ return _consumers[sourceTopic].Acknowledge(messageId,
cancellationToken);
+ }
+ }, cancellationToken)
+ .ConfigureAwait(false);
}
- public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
+ public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute(() =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ var sourceTopic = Topic;
+ if (_isPartitioned)
+ {
+ sourceTopic =
getPartitonedTopicName(messageId.Partition);
+ }
+ return
_consumers[sourceTopic].AcknowledgeCumulative(messageId, cancellationToken);
+ }
+ }, cancellationToken)
+ .ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
+ ThrowIfDisposed();
+
var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() =>
GetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- private void Guard()
+ private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
{
- if (_isDisposed != 0)
- throw new ConsumerDisposedException(GetType().FullName!);
+ ThrowIfNotActive();
- if (_faultException is not null)
- throw new ConsumerFaultedException(_faultException);
+ if (_isPartitioned)
+ {
+ throw new NotImplementedException("GetLastMessageId is not
implemented for partitioned topics");
+ }
+ using (_lock.ReaderLock())
+ {
+ return await
_consumers.First().Value.GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken = default)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
+ return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- public async ValueTask ChannelFaulted(Exception exception)
+ private async ValueTask<IMessage<TMessage>>
ReceiveMessage(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ if (_messagesQueue.TryDequeue(out var message))
+ {
+ return message;
+ }
+ var cts = new CancellationTokenSource();
+ var linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var done = false;
+
+ Task<IMessage<TMessage>>[] receiveTasks =
_consumers.Values.Select(consumer =>
consumer.Receive(linkedCts.Token).AsTask()).ToArray();
+ await Task.WhenAny(receiveTasks).ConfigureAwait(false);
+
+ try
+ {
+ receiveTasks.Where(t => t.IsCompleted).ToList().ForEach(t
=>
+ {
+ if (t.Result == null)
+ {
+ return;
+ }
+
+ done = true;
+ _messagesQueue.Enqueue(t.Result);
+ }
+ );
+ }
+ catch (Exception exception)
+ {
+ if (linkedCts.IsCancellationRequested)
+ {
+ cts.Cancel();
+ }
+ else
+ {
+ throw exception;
+ }
+ }
+ if (done)
+ {
+ break;
+ }
+ }
+ cts.Cancel();
+ cts.Dispose();
+ _messagesQueue.TryDequeue(out var result);
+ return result;
+ }
}
- private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
+ public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken = default)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute<ValueTask>(async () =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ var tasks = messageIds.Select(n =>
+ {
+ var sourceTopic = Topic;
+ if (_isPartitioned)
+ {
+ sourceTopic = getPartitonedTopicName(n.Partition);
+ }
+ return
_consumers[getPartitonedTopicName(n.Partition)].RedeliverUnacknowledgedMessages(cancellationToken).AsTask();
+ });
+ await Task.WhenAll(tasks).ConfigureAwait(false);
+ }
+ }, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
+ public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken = default)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute<ValueTask>(async () =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ var tasks = _consumers.Values.Select(consumer =>
+
consumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask()
+ );
+ await Task.WhenAll(tasks).ConfigureAwait(false);
+ }
+ }, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute<ValueTask>(async () =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ if (messageId.Equals(null))
+ {
+ throw new ArgumentException("Illegal messageId cannot be
null");
+ }
+
+ var tasks = _consumers.Values.Select(consumer =>
+ consumer.Seek(messageId, cancellationToken).AsTask()
+ );
+ await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+ _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+ }
+ }, cancellationToken).ConfigureAwait(false);
}
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken = default)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute<ValueTask>(async () =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ var tasks = _consumers.Values.Select(consumer =>
+ consumer.Seek(publishTime, cancellationToken).AsTask()
+ );
+ await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+ _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+ }
+ }, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
+ public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ ThrowIfDisposed();
+
+ await _executor.Execute<ValueTask>(async () =>
+ {
+ ThrowIfNotActive();
+
+ using (_lock.ReaderLock())
+ {
+ var tasks = _consumers.Values.Select(consumer =>
+ consumer.Unsubscribe(cancellationToken).AsTask()
+ );
+ await Task.WhenAll(tasks).ConfigureAwait(false);
+ }
+ }, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask InternalUnsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
+ private string getPartitonedTopicName(int partitionNumber)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ return $"{Topic}-partition-{partitionNumber}";
}
- private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
+ private void ThrowIfNotActive()
{
- var commandAck = _commandAckPool.Get();
- commandAck.Type = ackType;
- if (commandAck.MessageIds.Count == 0)
- commandAck.MessageIds.Add(messageId.ToMessageIdData());
- else
- commandAck.MessageIds[0].MapFrom(messageId);
-
- try
- {
- await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _commandAckPool.Return(commandAck);
- }
+ if (_state.CurrentState != ConsumerState.Active)
+ throw new ConsumerNotActiveException("The consumer is not yet
activated.");
}
+
+ public void Register(IEvent @event) { }
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index edb90fe..727d8a7 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,6 +44,7 @@ public sealed class DefaultExceptionHandler : IHandleException
ServiceNotReadyException _ => FaultAction.Retry,
MetadataException _ => FaultAction.Rethrow,
ConsumerNotFoundException _ => FaultAction.Retry,
+ ConsumerNotActiveException _ => FaultAction.Retry,
ConsumerBusyException _ => FaultAction.Retry,
ProducerBusyException _ => FaultAction.Retry,
ProducerFencedException _ => FaultAction.Rethrow,
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
similarity index 86%
copy from src/DotPulsar/Internal/Consumer.cs
copy to src/DotPulsar/Internal/SubConsumer.cs
index 9cb3327..bbc9bd3 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -27,23 +27,23 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
+public sealed class SubConsumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
private IConsumerChannel<TMessage> _channel;
- private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
private readonly IStateChanged<ConsumerState> _state;
private readonly IConsumerChannelFactory<TMessage> _factory;
- private int _isDisposed;
private Exception? _faultException;
+ private readonly ObjectPool<CommandAck> _commandAckPool;
public Uri ServiceUrl { get; }
public string SubscriptionName { get; }
public string Topic { get; }
+ private int _isDisposed;
- public Consumer(
+ public SubConsumer(
Guid correlationId,
Uri serviceUrl,
string subscriptionName,
@@ -52,7 +52,8 @@ public sealed class Consumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
IConsumerChannel<TMessage> initialChannel,
IExecute executor,
IStateChanged<ConsumerState> state,
- IConsumerChannelFactory<TMessage> factory)
+ IConsumerChannelFactory<TMessage> factory
+ )
{
_correlationId = correlationId;
ServiceUrl = serviceUrl;
@@ -63,127 +64,129 @@ public sealed class Consumer<TMessage> :
IContainsChannel, IConsumer<TMessage>
_executor = executor;
_state = state;
_factory = factory;
- _commandAckPool = new DefaultObjectPool<CommandAck>(new
DefaultPooledObjectPolicy<CommandAck>());
_isDisposed = 0;
+ _commandAckPool = new DefaultObjectPool<CommandAck>(new
DefaultPooledObjectPolicy<CommandAck>());
_eventRegister.Register(new ConsumerCreated(_correlationId));
}
- public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
- public bool IsFinalState()
- => _state.IsFinalState();
+ public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
+ => await InternalAcknowledge(messageId,
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
- public bool IsFinalState(ConsumerState state)
- => _state.IsFinalState(state);
- public async ValueTask DisposeAsync()
+ private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
{
- if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
- return;
+ var commandAck = _commandAckPool.Get();
+ commandAck.Type = ackType;
+ if (commandAck.MessageIds.Count == 0)
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ else
+ commandAck.MessageIds[0].MapFrom(messageId);
- _eventRegister.Register(new ConsumerDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
+ try
+ {
+ await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ _commandAckPool.Return(commandAck);
+ }
}
- private async ValueTask DisposeChannel()
+ private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
{
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
-
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
=> await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
- public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
+ public async ValueTask ChannelFaulted(Exception exception)
{
- var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
- await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ _faultException = exception;
+ await DisposeChannel().ConfigureAwait(false);
}
- public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
- => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
+ public async ValueTask CloseChannel(CancellationToken cancellationToken)
+ => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
- public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+ public async ValueTask DisposeAsync()
{
- var unsubscribe = new CommandUnsubscribe();
- await _executor.Execute(() => InternalUnsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
+ if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+ return;
- public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
- {
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ _eventRegister.Register(new ConsumerDisposed(_correlationId));
+ await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+
+ var oldChannel = _channel;
+ if (oldChannel is not null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+ _channel = channel;
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
+ Guard();
var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return await _channel.Send(getLastMessageId,
cancellationToken).ConfigureAwait(false);
}
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ConsumerDisposedException(GetType().FullName!);
+ public bool IsFinalState()
+ => _state.IsFinalState();
- if (_faultException is not null)
- throw new ConsumerFaultedException(_faultException);
- }
+ public bool IsFinalState(ConsumerState state)
+ => _state.IsFinalState(state);
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
- {
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken = default)
+ => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
- _channel = channel;
- }
+ public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken = default)
+ => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
+ => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
- public async ValueTask ChannelFaulted(Exception exception)
+ private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ Guard();
+ return await _channel.Receive(cancellationToken).ConfigureAwait(false);
}
- private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
+ public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ var command = new CommandRedeliverUnacknowledgedMessages();
+ command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
+ await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
+ public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
+ => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
+
private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
{
Guard();
await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken = default)
+ {
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
@@ -192,34 +195,28 @@ public sealed class Consumer<TMessage> :
IContainsChannel, IConsumer<TMessage>
await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
+ public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ var unsubscribe = new CommandUnsubscribe();
+ await _executor.Execute(() => Unsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask InternalUnsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
+ private async ValueTask Unsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
+ => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+
+ private void Guard()
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ if (_isDisposed != 0)
+ throw new ConsumerDisposedException(GetType().FullName!);
+
+ if (_faultException is not null)
+ throw new ConsumerFaultedException(_faultException);
}
- private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
+ private async ValueTask DisposeChannel()
{
- var commandAck = _commandAckPool.Get();
- commandAck.Type = ackType;
- if (commandAck.MessageIds.Count == 0)
- commandAck.MessageIds.Add(messageId.ToMessageIdData());
- else
- commandAck.MessageIds[0].MapFrom(messageId);
-
- try
- {
- await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _commandAckPool.Return(commandAck);
- }
+ await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 66284ef..14085f1 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -89,43 +89,12 @@ public sealed class PulsarClient : IPulsarClient
{
ThrowIfDisposed();
- var correlationId = Guid.NewGuid();
- var consumerName = options.ConsumerName ??
$"Consumer-{correlationId:N}";
- var subscribe = new CommandSubscribe
- {
- ConsumerName = consumerName,
- InitialPosition = (CommandSubscribe.InitialPositionType)
options.InitialPosition,
- PriorityLevel = options.PriorityLevel,
- ReadCompacted = options.ReadCompacted,
- ReplicateSubscriptionState = options.ReplicateSubscriptionState,
- Subscription = options.SubscriptionName,
- Topic = options.Topic,
- Type = (CommandSubscribe.SubType) options.SubscriptionType
- };
+ var consumer = new Consumer<TMessage>(ServiceUrl, options,
_processManager, _exceptionHandler, _connectionPool);
- foreach (var property in options.SubscriptionProperties)
- {
- var keyValue = new KeyValue { Key = property.Key, Value =
property.Value };
- subscribe.SubscriptionProperties.Add(keyValue);
- }
-
- var messagePrefetchCount = options.MessagePrefetchCount;
- var messageFactory = new MessageFactory<TMessage>(options.Schema);
- var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
- var decompressorFactories =
CompressionFactories.DecompressorFactories();
- var factory = new ConsumerChannelFactory<TMessage>(correlationId,
_processManager, _connectionPool, subscribe, messagePrefetchCount,
batchHandler, messageFactory, decompressorFactories);
- var stateManager = new
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- var initialChannel = new NotReadyChannel<TMessage>();
- var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
- var consumer = new Consumer<TMessage>(correlationId, ServiceUrl,
options.SubscriptionName, options.Topic, _processManager, initialChannel,
executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer,
options.StateChangedHandler);
- var process = new ConsumerProcess(correlationId, stateManager,
consumer, options.SubscriptionType == SubscriptionType.Failover);
- _processManager.Add(process);
- process.Start();
return consumer;
}
-
/// <summary>
/// Create a reader.
/// </summary>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/ConsumerTests.cs
index fbf43c8..bbb23f7 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -70,6 +70,32 @@ public class ConsumerTests
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
PartitionConsume_WhenGetLastMessageId_ThenShouldThrowException()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ var topicName = $"consumer-tests-{testRunId}";
+
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ //Act
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
+ Assert.ThrowsAsync<NotImplementedException>(async () => await
consumer.GetLastMessageId(cts.Token));
+ }
+
private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages,
CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];