This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eb8a54c Spring cleaning (#161)
eb8a54c is described below
commit eb8a54c87391dfb77c2b3444be2df5696a06b456
Author: entvex <[email protected]>
AuthorDate: Fri Jun 2 09:17:01 2023 +0200
Spring cleaning (#161)
* spring cleaning
* spring cleaning
* reformat and cleanup code profile: DotPulsar: Full Cleanup
---------
Co-authored-by: David Jensen <[email protected]>
---
CHANGELOG.md | 2 -
samples/Consuming/Program.cs | 5 +-
samples/Processing/LoggerExtensions.cs | 2 +-
samples/Processing/Worker.cs | 10 +-
src/DotPulsar/ConsumerState.cs | 7 +-
src/DotPulsar/DotPulsar.csproj | 1 -
.../Exceptions/ConsumerNotActiveException.cs | 21 -
src/DotPulsar/Internal/AsyncQueueWithCursor.cs | 2 +-
src/DotPulsar/Internal/Consumer.cs | 495 +++++----------------
src/DotPulsar/Internal/ConsumerProcess.cs | 2 +-
src/DotPulsar/Internal/DefaultExceptionHandler.cs | 1 -
src/DotPulsar/Internal/Producer.cs | 4 +-
src/DotPulsar/Internal/ProducerBuilder.cs | 1 -
src/DotPulsar/Internal/ProducerProcess.cs | 2 +-
src/DotPulsar/Internal/ReaderProcess.cs | 2 +-
src/DotPulsar/Internal/SubConsumer.cs | 222 ---------
src/DotPulsar/PulsarClient.cs | 33 +-
tests/DotPulsar.Tests/ConsumerTests.cs | 26 --
tests/DotPulsar.Tests/ProducerTests.cs | 1 -
19 files changed, 156 insertions(+), 683 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d84fc4c..96ab6c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,8 +6,6 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
## [UnReleased]
-- Support for consuming partitioned topics
-
### Fixed
- Fixed issue preventing reader to correctly go into `Faulted` state.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 9cbddbd..dd71b35 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -52,9 +52,8 @@ internal static class Program
{
await foreach (var message in consumer.Messages(cancellationToken))
{
- Console.WriteLine($"Received: {message?.Value()}");
- if (message is not null)
- await consumer.Acknowledge(message, cancellationToken);
+ Console.WriteLine($"Received: {message.Value()}");
+ await consumer.Acknowledge(message, cancellationToken);
}
}
catch (OperationCanceledException) { }
diff --git a/samples/Processing/LoggerExtensions.cs
b/samples/Processing/LoggerExtensions.cs
index e728b12..bc5bf45 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Processing/LoggerExtensions.cs
@@ -74,7 +74,7 @@ public static partial class LoggerExtensions
public static ValueTask ConsumerRegainedConnection(this ILogger logger,
IConsumer consumer, ConsumerState state, string ticketId, CancellationToken
cancellationToken)
{
logger.ConsumerRegainedConnection(consumer.Topic);
- return ValueTask.CompletedTask; // If an alert has been opened, this
is where we can close it again
+ return ValueTask.CompletedTask; // If an alert has been opened, this
is where we can close it again
}
[LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has regained the connection")]
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index e471b29..d49bc64 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -28,7 +28,7 @@ public class Worker : BackgroundService
{
await using var client = PulsarClient.Builder()
.ExceptionHandler(_logger.PulsarClientException) // Optional
- .Build(); // Connecting to pulsar://localhost:6650
+ .Build(); // Connecting to
pulsar://localhost:6650
await using var consumer = client.NewConsumer(Schema.String)
.StateChangedHandler(_logger.ConsumerChangedState) // Optional
@@ -36,10 +36,10 @@ public class Worker : BackgroundService
.Topic("persistent://public/default/mytopic")
.Create();
- _ = consumer.DelayedStateMonitor( // Recommended way of ignoring the
short disconnects expected when working with a distributed system
- ConsumerState.Active, // Operational state
- TimeSpan.FromSeconds(5), // The amount of time allowed in
non-operational state before we act
- _logger.ConsumerLostConnection, // Invoked if we are NOT back in
operational state after 5 seconds
+ _ = consumer.DelayedStateMonitor( // Recommended way of ignoring
the short disconnects expected when working with a distributed system
+ ConsumerState.Active, // Operational state
+ TimeSpan.FromSeconds(5), // The amount of time allowed
in non-operational state before we act
+ _logger.ConsumerLostConnection, // Invoked if we are NOT back
in operational state after 5 seconds
_logger.ConsumerRegainedConnection, // Invoked when we are in
operational state again
cancellationToken);
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
index 7dfd9ed..d251e22 100644
--- a/src/DotPulsar/ConsumerState.cs
+++ b/src/DotPulsar/ConsumerState.cs
@@ -52,10 +52,5 @@ public enum ConsumerState : byte
/// <summary>
/// The consumer has unsubscribed. This is a final state.
/// </summary>
- Unsubscribed,
-
- /// <summary>
- /// When the topic is a partition topic and some of the SubConsumers are
active.
- /// </summary>
- PartiallyActive
+ Unsubscribed
}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0675118..64099f1 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -25,7 +25,6 @@
<PackageReference Include="HashDepot" Version="2.0.3" />
<PackageReference Include="Microsoft.Extensions.ObjectPool"
Version="7.0.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1"
PrivateAssets="All" />
- <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
<PackageReference Include="protobuf-net" Version="3.2.16" />
<PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
</ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
deleted file mode 100644
index 7a2c1db..0000000
--- a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Exceptions
-{
- public sealed class ConsumerNotActiveException : DotPulsarException
- {
- public ConsumerNotActiveException(string message) : base(message) { }
- }
-}
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index f7f3824..4855a84 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -179,7 +179,7 @@ public sealed class AsyncQueueWithCursor<T> :
IAsyncDisposable where T : IDispos
}
finally
{
- bool shouldThrow = _cursorNextItemTcs is not null &&
_cursorNextItemTcs.Task.IsCanceled;
+ var shouldThrow = _cursorNextItemTcs is not null &&
_cursorNextItemTcs.Task.IsCanceled;
lock (_queue)
{
diff --git a/src/DotPulsar/Internal/Consumer.cs
b/src/DotPulsar/Internal/Consumer.cs
index 7558a13..9cb3327 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,35 +16,27 @@ namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
-using DotPulsar.Extensions;
using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Compression;
+using DotPulsar.Internal.Events;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
+using Microsoft.Extensions.ObjectPool;
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using Nito.AsyncEx;
-public sealed class Consumer<TMessage> : IConsumer<TMessage>, IRegisterEvent
+public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
{
+ private readonly Guid _correlationId;
+ private readonly IRegisterEvent _eventRegister;
+ private IConsumerChannel<TMessage> _channel;
+ private readonly ObjectPool<CommandAck> _commandAckPool;
private readonly IExecute _executor;
- private readonly StateManager<ConsumerState> _state;
- private readonly ConsumerOptions<TMessage> _options;
- private readonly ProcessManager _processManager;
- private readonly IHandleException _exceptionHandler;
- private readonly IConnectionPool _connectionPool;
- private readonly CancellationTokenSource _cts;
- private readonly ConcurrentDictionary<string, IConsumer<TMessage>>
_consumers;
- private readonly AsyncReaderWriterLock _lock;
- private ConcurrentQueue<IMessage<TMessage>> _messagesQueue;
+ private readonly IStateChanged<ConsumerState> _state;
+ private readonly IConsumerChannelFactory<TMessage> _factory;
private int _isDisposed;
- private int _consumerCount;
- private uint _numberOfPartitions;
- private bool _isPartitioned;
private Exception? _faultException;
public Uri ServiceUrl { get; }
@@ -52,31 +44,29 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>, IRegisterEvent
public string Topic { get; }
public Consumer(
+ Guid correlationId,
Uri serviceUrl,
- ConsumerOptions<TMessage> options,
- ProcessManager processManager,
- IHandleException exceptionHandler,
- IConnectionPool connectionPool)
- {
- Topic = options.Topic;
+ string subscriptionName,
+ string topic,
+ IRegisterEvent eventRegister,
+ IConsumerChannel<TMessage> initialChannel,
+ IExecute executor,
+ IStateChanged<ConsumerState> state,
+ IConsumerChannelFactory<TMessage> factory)
+ {
+ _correlationId = correlationId;
ServiceUrl = serviceUrl;
- SubscriptionName = options.SubscriptionName;
-
- _options = options;
- _processManager = processManager;
- _exceptionHandler = exceptionHandler;
- _connectionPool = connectionPool;
-
- _state = new StateManager<ConsumerState>(ConsumerState.Disconnected,
ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- _cts = new CancellationTokenSource();
- _executor = new Executor(Guid.Empty, this, _exceptionHandler);
- _consumers = new ConcurrentDictionary<string, IConsumer<TMessage>>();
- _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+ SubscriptionName = subscriptionName;
+ Topic = topic;
+ _eventRegister = eventRegister;
+ _channel = initialChannel;
+ _executor = executor;
+ _state = state;
+ _factory = factory;
+ _commandAckPool = new DefaultObjectPool<CommandAck>(new
DefaultPooledObjectPolicy<CommandAck>());
_isDisposed = 0;
- _lock = new AsyncReaderWriterLock();
-
- _ = Setup();
+ _eventRegister.Register(new ConsumerCreated(_correlationId));
}
public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken)
@@ -96,407 +86,140 @@ public sealed class Consumer<TMessage> :
IConsumer<TMessage>, IRegisterEvent
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _cts.Cancel();
- _cts.Dispose();
-
- _state.SetState(ConsumerState.Closed);
-
- using (_lock.ReaderLock())
- {
- foreach (var consumer in _consumers.Values)
- {
- await consumer.DisposeAsync().ConfigureAwait(false);
- }
- }
+ _eventRegister.Register(new ConsumerDisposed(_correlationId));
+ await DisposeChannel().ConfigureAwait(false);
}
- private async Task Setup()
+ private async ValueTask DisposeChannel()
{
- await Task.Yield();
-
- try
- {
- await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
- }
- catch (Exception exception)
- {
- if (_cts.IsCancellationRequested)
- return;
-
- _faultException = exception;
- _state.SetState(ConsumerState.Faulted);
- }
- }
- private async Task Monitor()
- {
- _numberOfPartitions = await GetNumberOfPartitions(Topic,
_cts.Token).ConfigureAwait(false);
- _isPartitioned = _numberOfPartitions != 0;
- var monitoringTasks = new List<Task<ConsumerStateChanged>>();
-
- using (_lock.ReaderLock())
- {
- if (_isPartitioned)
- {
-
- for (var partition = 0; partition < _numberOfPartitions;
++partition)
- {
- var partitionedTopicName =
getPartitonedTopicName(partition);
-
- var consumer = CreateSubConsumer(partitionedTopicName);
- _ = _consumers.TryAdd(partitionedTopicName, consumer);
-
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected,
_cts.Token).AsTask());
- }
- }
-
- else
- {
- var consumer = CreateSubConsumer(Topic);
- _ = _consumers.TryAdd(Topic, consumer);
-
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected,
_cts.Token).AsTask());
- }
-
- Interlocked.Exchange(ref _consumerCount, monitoringTasks.Count);
- }
- var activeConsumers = 0;
- while (true)
- {
- await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
-
- for (var i = 0; i < monitoringTasks.Count; ++i)
- {
- var task = monitoringTasks[i];
- if (!task.IsCompleted)
- continue;
- var state = task.Result.ConsumerState;
- switch (state)
- {
- case ConsumerState.Active:
- ++activeConsumers;
- break;
- case ConsumerState.Disconnected:
- --activeConsumers;
- break;
- case ConsumerState.ReachedEndOfTopic:
- _state.SetState(ConsumerState.ReachedEndOfTopic);
- return;
- case ConsumerState.Faulted:
- _state.SetState(ConsumerState.Faulted);
- return;
- case ConsumerState.Unsubscribed:
- _state.SetState(ConsumerState.Unsubscribed);
- return;
- }
-
- monitoringTasks[i] =
task.Result.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
- }
-
- if (activeConsumers == 0)
- _state.SetState(ConsumerState.Disconnected);
- else if (activeConsumers == monitoringTasks.Count)
- _state.SetState(ConsumerState.Active);
- else
- _state.SetState(ConsumerState.PartiallyActive);
- }
+ await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+ await _channel.DisposeAsync().ConfigureAwait(false);
}
- private SubConsumer<TMessage> CreateSubConsumer(string topic)
- {
- var correlationId = Guid.NewGuid();
- var consumerName = _options.ConsumerName ??
$"Consumer-{correlationId:N}";
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
+ => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
- var subscribe = new CommandSubscribe
- {
- ConsumerName = consumerName,
- InitialPosition = (CommandSubscribe.InitialPositionType)
_options.InitialPosition,
- PriorityLevel = _options.PriorityLevel,
- ReadCompacted = _options.ReadCompacted,
- ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
- Subscription = _options.SubscriptionName,
- Topic = topic,
- Type = (CommandSubscribe.SubType) _options.SubscriptionType
- };
-
- foreach (var property in _options.SubscriptionProperties)
- {
- var keyValue = new KeyValue { Key = property.Key, Value =
property.Value };
- subscribe.SubscriptionProperties.Add(keyValue);
- }
+ public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
+ => await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
- var messagePrefetchCount = _options.MessagePrefetchCount;
- var messageFactory = new MessageFactory<TMessage>(_options.Schema);
- var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
- var decompressorFactories =
CompressionFactories.DecompressorFactories();
-
- var factory = new ConsumerChannelFactory<TMessage>(correlationId,
_processManager, _connectionPool, subscribe, messagePrefetchCount,
batchHandler, messageFactory, decompressorFactories);
- var stateManager = new
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- var initialChannel = new NotReadyChannel<TMessage>();
- var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
- var consumer = new SubConsumer<TMessage>(correlationId, ServiceUrl,
_options.SubscriptionName, topic, _processManager, initialChannel, executor,
stateManager, factory);
- var process = new ConsumerProcess(correlationId, stateManager,
consumer, _options.SubscriptionType == SubscriptionType.Failover);
- _processManager.Add(process);
- process.Start();
- return consumer;
- }
+ public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
+ => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
- private async Task<uint> GetNumberOfPartitions(string topic,
CancellationToken cancellationToken)
+ public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
- var connection = await _connectionPool.FindConnectionForTopic(topic,
cancellationToken).ConfigureAwait(false);
- var commandPartitionedMetadata = new
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
- var response = await connection.Send(commandPartitionedMetadata,
cancellationToken).ConfigureAwait(false);
-
-
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
-
- if (response.PartitionMetadataResponse.Response ==
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
- response.PartitionMetadataResponse.Throw();
-
- return response.PartitionMetadataResponse.Partitions;
+ var command = new CommandRedeliverUnacknowledgedMessages();
+ command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
+ await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- private void ThrowIfDisposed()
+ public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
+ => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
+
+ public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
- if (_isDisposed != 0)
- throw new ConsumerDisposedException(GetType().FullName!);
+ var unsubscribe = new CommandUnsubscribe();
+ await _executor.Execute(() => InternalUnsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
}
- public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken = default)
- {
- ThrowIfDisposed();
- var sourceTopic = Topic;
- if (_isPartitioned)
- {
- sourceTopic = getPartitonedTopicName(messageId.Partition);
- }
- await _executor.Execute(() =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- return _consumers[sourceTopic].Acknowledge(messageId,
cancellationToken);
- }
- }, cancellationToken)
- .ConfigureAwait(false);
+ public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
+ {
+ var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
+ public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute(() =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- var sourceTopic = Topic;
- if (_isPartitioned)
- {
- sourceTopic =
getPartitonedTopicName(messageId.Partition);
- }
- return
_consumers[sourceTopic].AcknowledgeCumulative(messageId, cancellationToken);
- }
- }, cancellationToken)
- .ConfigureAwait(false);
+ var seek = new CommandSeek { MessagePublishTime = publishTime };
+ await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
GetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<MessageId>
GetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ private void Guard()
{
- ThrowIfNotActive();
+ if (_isDisposed != 0)
+ throw new ConsumerDisposedException(GetType().FullName!);
- if (_isPartitioned)
- {
- throw new NotImplementedException("GetLastMessageId is not
implemented for partitioned topics");
- }
- using (_lock.ReaderLock())
- {
- return await
_consumers.First().Value.GetLastMessageId(cancellationToken).ConfigureAwait(false);
- }
+ if (_faultException is not null)
+ throw new ConsumerFaultedException(_faultException);
}
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken = default)
+ public async Task EstablishNewChannel(CancellationToken cancellationToken)
{
- ThrowIfDisposed();
+ var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
- return await _executor.Execute(() =>
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+ var oldChannel = _channel;
+ if (oldChannel is not null)
+ await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+ _channel = channel;
}
- private async ValueTask<IMessage<TMessage>>
ReceiveMessage(CancellationToken cancellationToken)
- {
- ThrowIfNotActive();
+ public async ValueTask CloseChannel(CancellationToken cancellationToken)
+ => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
- using (_lock.ReaderLock())
- {
- if (_messagesQueue.TryDequeue(out var message))
- {
- return message;
- }
- var cts = new CancellationTokenSource();
- var linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
- while (!cancellationToken.IsCancellationRequested)
- {
- var done = false;
-
- Task<IMessage<TMessage>>[] receiveTasks =
_consumers.Values.Select(consumer =>
consumer.Receive(linkedCts.Token).AsTask()).ToArray();
- await Task.WhenAny(receiveTasks).ConfigureAwait(false);
-
- try
- {
- receiveTasks.Where(t => t.IsCompleted).ToList().ForEach(t
=>
- {
- if (t.Result == null)
- {
- return;
- }
-
- done = true;
- _messagesQueue.Enqueue(t.Result);
- }
- );
- }
- catch (Exception exception)
- {
- if (linkedCts.IsCancellationRequested)
- {
- cts.Cancel();
- }
- else
- {
- throw exception;
- }
- }
- if (done)
- {
- break;
- }
- }
- cts.Cancel();
- cts.Dispose();
- _messagesQueue.TryDequeue(out var result);
- return result;
- }
+ public async ValueTask ChannelFaulted(Exception exception)
+ {
+ _faultException = exception;
+ await DisposeChannel().ConfigureAwait(false);
}
- public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken = default)
+ private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute<ValueTask>(async () =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- var tasks = messageIds.Select(n =>
- {
- var sourceTopic = Topic;
- if (_isPartitioned)
- {
- sourceTopic = getPartitonedTopicName(n.Partition);
- }
- return
_consumers[getPartitonedTopicName(n.Partition)].RedeliverUnacknowledgedMessages(cancellationToken).AsTask();
- });
- await Task.WhenAll(tasks).ConfigureAwait(false);
- }
- }, cancellationToken).ConfigureAwait(false);
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken = default)
+ private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute<ValueTask>(async () =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- var tasks = _consumers.Values.Select(consumer =>
-
consumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask()
- );
- await Task.WhenAll(tasks).ConfigureAwait(false);
- }
- }, cancellationToken).ConfigureAwait(false);
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default)
+ private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute<ValueTask>(async () =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- if (messageId.Equals(null))
- {
- throw new ArgumentException("Illegal messageId cannot be
null");
- }
-
- var tasks = _consumers.Values.Select(consumer =>
- consumer.Seek(messageId, cancellationToken).AsTask()
- );
- await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
- _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
- }
- }, cancellationToken).ConfigureAwait(false);
+ Guard();
+ return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
}
- public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken = default)
+ private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute<ValueTask>(async () =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- var tasks = _consumers.Values.Select(consumer =>
- consumer.Seek(publishTime, cancellationToken).AsTask()
- );
- await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
- _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
- }
- }, cancellationToken).ConfigureAwait(false);
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+ private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- ThrowIfDisposed();
-
- await _executor.Execute<ValueTask>(async () =>
- {
- ThrowIfNotActive();
-
- using (_lock.ReaderLock())
- {
- var tasks = _consumers.Values.Select(consumer =>
- consumer.Unsubscribe(cancellationToken).AsTask()
- );
- await Task.WhenAll(tasks).ConfigureAwait(false);
- }
- }, cancellationToken).ConfigureAwait(false);
+ Guard();
+ return await _channel.Receive(cancellationToken).ConfigureAwait(false);
}
- private string getPartitonedTopicName(int partitionNumber)
+ private async ValueTask InternalUnsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
{
- return $"{Topic}-partition-{partitionNumber}";
+ Guard();
+ await _channel.Send(command, cancellationToken).ConfigureAwait(false);
}
- private void ThrowIfNotActive()
+ private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
{
- if (_state.CurrentState != ConsumerState.Active)
- throw new ConsumerNotActiveException("The consumer is not yet
activated.");
- }
+ var commandAck = _commandAckPool.Get();
+ commandAck.Type = ackType;
+ if (commandAck.MessageIds.Count == 0)
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ else
+ commandAck.MessageIds[0].MapFrom(messageId);
- public void Register(IEvent @event) { }
+ try
+ {
+ await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ }
+ finally
+ {
+ _commandAckPool.Return(commandAck);
+ }
+ }
}
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs
b/src/DotPulsar/Internal/ConsumerProcess.cs
index aa0f9f6..6c3584a 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -51,7 +51,7 @@ public sealed class ConsumerProcess : Process
{
var formerState = _stateManager.SetState(ConsumerState.Faulted);
if (formerState != ConsumerState.Faulted)
- ActionQueue.Enqueue(async _ => await
_consumer.ChannelFaulted(Exception!) );
+ ActionQueue.Enqueue(async _ => await
_consumer.ChannelFaulted(Exception!));
return;
}
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 727d8a7..edb90fe 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,7 +44,6 @@ public sealed class DefaultExceptionHandler : IHandleException
ServiceNotReadyException _ => FaultAction.Retry,
MetadataException _ => FaultAction.Rethrow,
ConsumerNotFoundException _ => FaultAction.Retry,
- ConsumerNotActiveException _ => FaultAction.Retry,
ConsumerBusyException _ => FaultAction.Retry,
ProducerBusyException _ => FaultAction.Retry,
ProducerFencedException _ => FaultAction.Rethrow,
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index dedb8f9..66c04e1 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -130,7 +130,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
var connectedProducers = 0;
- bool[] waitingForExclusive = new bool[isPartitionedTopic ?
numberOfPartitions : 1];
+ var waitingForExclusive = new bool[isPartitionedTopic ?
numberOfPartitions : 1];
while (true)
{
@@ -386,7 +386,7 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
}
private StateManager<ProducerState> CreateStateManager()
- => new (ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted, ProducerState.Fenced);
+ => new(ProducerState.Disconnected, ProducerState.Closed,
ProducerState.Faulted, ProducerState.Fenced);
public void Register(IEvent @event) { }
}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs
b/src/DotPulsar/Internal/ProducerBuilder.cs
index 647f514..dbdac4e 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -42,7 +42,6 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
_producerAccessMode =
ProducerOptions<TMessage>.DefaultProducerAccessMode;
}
-
public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool
attachTraceInfoToMessages)
{
_attachTraceInfoToMessages = attachTraceInfoToMessages;
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs
b/src/DotPulsar/Internal/ProducerProcess.cs
index f2db79f..fc25af7 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -47,7 +47,7 @@ public sealed class ProducerProcess : Process
if (ExecutorState == ExecutorState.Faulted)
{
- ProducerState newState = Exception! is ProducerFencedException ?
ProducerState.Fenced : ProducerState.Faulted;
+ var newState = Exception! is ProducerFencedException ?
ProducerState.Fenced : ProducerState.Faulted;
var formerState = _stateManager.SetState(newState);
if (formerState != ProducerState.Faulted && formerState !=
ProducerState.Fenced)
ActionQueue.Enqueue(async _ => await
_producer.ChannelFaulted(Exception!));
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs
b/src/DotPulsar/Internal/ReaderProcess.cs
index d904710..09ad54f 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -48,7 +48,7 @@ public sealed class ReaderProcess : Process
{
var formerState = _stateManager.SetState(ReaderState.Faulted);
if (formerState != ReaderState.Faulted)
- ActionQueue.Enqueue(async _ => await
_reader.ChannelFaulted(Exception!) );
+ ActionQueue.Enqueue(async _ => await
_reader.ChannelFaulted(Exception!));
return;
}
diff --git a/src/DotPulsar/Internal/SubConsumer.cs
b/src/DotPulsar/Internal/SubConsumer.cs
deleted file mode 100644
index bbc9bd3..0000000
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal;
-
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using Microsoft.Extensions.ObjectPool;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-public sealed class SubConsumer<TMessage> : IContainsChannel,
IConsumer<TMessage>
-{
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel<TMessage> _channel;
- private readonly IExecute _executor;
- private readonly IStateChanged<ConsumerState> _state;
- private readonly IConsumerChannelFactory<TMessage> _factory;
- private Exception? _faultException;
- private readonly ObjectPool<CommandAck> _commandAckPool;
-
- public Uri ServiceUrl { get; }
- public string SubscriptionName { get; }
- public string Topic { get; }
- private int _isDisposed;
-
- public SubConsumer(
- Guid correlationId,
- Uri serviceUrl,
- string subscriptionName,
- string topic,
- IRegisterEvent eventRegister,
- IConsumerChannel<TMessage> initialChannel,
- IExecute executor,
- IStateChanged<ConsumerState> state,
- IConsumerChannelFactory<TMessage> factory
- )
- {
- _correlationId = correlationId;
- ServiceUrl = serviceUrl;
- SubscriptionName = subscriptionName;
- Topic = topic;
- _eventRegister = eventRegister;
- _channel = initialChannel;
- _executor = executor;
- _state = state;
- _factory = factory;
- _isDisposed = 0;
- _commandAckPool = new DefaultObjectPool<CommandAck>(new
DefaultPooledObjectPolicy<CommandAck>());
-
- _eventRegister.Register(new ConsumerCreated(_correlationId));
- }
-
- public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
- => await InternalAcknowledge(messageId,
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
-
- private async ValueTask InternalAcknowledge(MessageId messageId,
CommandAck.AckType ackType, CancellationToken cancellationToken)
- {
- var commandAck = _commandAckPool.Get();
- commandAck.Type = ackType;
- if (commandAck.MessageIds.Count == 0)
- commandAck.MessageIds.Add(messageId.ToMessageIdData());
- else
- commandAck.MessageIds[0].MapFrom(messageId);
-
- try
- {
- await _executor.Execute(() => InternalAcknowledge(commandAck,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
- finally
- {
- _commandAckPool.Return(commandAck);
- }
- }
-
- private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
- {
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask ChannelFaulted(Exception exception)
- {
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
- }
-
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- public async ValueTask DisposeAsync()
- {
- if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
- return;
-
- _eventRegister.Register(new ConsumerDisposed(_correlationId));
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
- }
-
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
- {
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
- }
-
- public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
- {
- Guard();
- var getLastMessageId = new CommandGetLastMessageId();
- return await _channel.Send(getLastMessageId,
cancellationToken).ConfigureAwait(false);
- }
-
- public bool IsFinalState()
- => _state.IsFinalState();
-
- public bool IsFinalState(ConsumerState state)
- => _state.IsFinalState(state);
-
- public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState
state, CancellationToken cancellationToken = default)
- => await _state.StateChangedFrom(state,
cancellationToken).ConfigureAwait(false);
-
-
- public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state,
CancellationToken cancellationToken = default)
- => await _state.StateChangedTo(state,
cancellationToken).ConfigureAwait(false);
-
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
- {
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
- {
- var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
- await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
- => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
- {
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken = default)
- {
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken = default)
- {
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
-
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
- {
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Unsubscribe(CancellationToken cancellationToken)
- {
- Guard();
- var unsubscribe = new CommandUnsubscribe();
- await _executor.Execute(() => Unsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- private async ValueTask Unsubscribe(CommandUnsubscribe command,
CancellationToken cancellationToken)
- => await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
-
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ConsumerDisposedException(GetType().FullName!);
-
- if (_faultException is not null)
- throw new ConsumerFaultedException(_faultException);
- }
-
- private async ValueTask DisposeChannel()
- {
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
- }
-}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 14085f1..66284ef 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -89,12 +89,43 @@ public sealed class PulsarClient : IPulsarClient
{
ThrowIfDisposed();
- var consumer = new Consumer<TMessage>(ServiceUrl, options,
_processManager, _exceptionHandler, _connectionPool);
+ var correlationId = Guid.NewGuid();
+ var consumerName = options.ConsumerName ??
$"Consumer-{correlationId:N}";
+ var subscribe = new CommandSubscribe
+ {
+ ConsumerName = consumerName,
+ InitialPosition = (CommandSubscribe.InitialPositionType)
options.InitialPosition,
+ PriorityLevel = options.PriorityLevel,
+ ReadCompacted = options.ReadCompacted,
+ ReplicateSubscriptionState = options.ReplicateSubscriptionState,
+ Subscription = options.SubscriptionName,
+ Topic = options.Topic,
+ Type = (CommandSubscribe.SubType) options.SubscriptionType
+ };
+ foreach (var property in options.SubscriptionProperties)
+ {
+ var keyValue = new KeyValue { Key = property.Key, Value =
property.Value };
+ subscribe.SubscriptionProperties.Add(keyValue);
+ }
+
+ var messagePrefetchCount = options.MessagePrefetchCount;
+ var messageFactory = new MessageFactory<TMessage>(options.Schema);
+ var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+ var decompressorFactories =
CompressionFactories.DecompressorFactories();
+ var factory = new ConsumerChannelFactory<TMessage>(correlationId,
_processManager, _connectionPool, subscribe, messagePrefetchCount,
batchHandler, messageFactory, decompressorFactories);
+ var stateManager = new
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed,
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
+ var consumer = new Consumer<TMessage>(correlationId, ServiceUrl,
options.SubscriptionName, options.Topic, _processManager, initialChannel,
executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer,
options.StateChangedHandler);
+ var process = new ConsumerProcess(correlationId, stateManager,
consumer, options.SubscriptionType == SubscriptionType.Failover);
+ _processManager.Add(process);
+ process.Start();
return consumer;
}
+
/// <summary>
/// Create a reader.
/// </summary>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/ConsumerTests.cs
index bbb23f7..fbf43c8 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -70,32 +70,6 @@ public class ConsumerTests
consumed.Should().BeEquivalentTo(produced);
}
- [Fact]
- public async Task
PartitionConsume_WhenGetLastMessageId_ThenShouldThrowException()
- {
- //Arrange
- var testRunId = Guid.NewGuid().ToString("N");
- const int partitions = 3;
- var topicName = $"consumer-tests-{testRunId}";
-
-
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
-
- //Act
- await using var client = PulsarClient.Builder()
- .ServiceUrl(_fixture.ServiceUrl)
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .Build();
-
- var consumer = client.NewConsumer(Schema.ByteArray)
- .ConsumerName($"consumer-{testRunId}")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName($"subscription-{testRunId}")
- .Topic(topicName)
- .Create();
- var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
- Assert.ThrowsAsync<NotImplementedException>(async () => await
consumer.GetLastMessageId(cts.Token));
- }
-
private static async Task<IEnumerable<MessageId>>
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages,
CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs
b/tests/DotPulsar.Tests/ProducerTests.cs
index db2b1ce..ed54195 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -175,7 +175,6 @@ public class ProducerTests
[InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive,
ProducerState.Connected, ProducerState.Fenced)]
[InlineData(ProducerAccessMode.Shared,
ProducerAccessMode.WaitForExclusive, ProducerState.Connected,
ProducerState.WaitingForExclusive)]
[InlineData(ProducerAccessMode.Exclusive,
ProducerAccessMode.WaitForExclusive, ProducerState.Connected,
ProducerState.WaitingForExclusive)]
-
public async Task
TwoProducers_WhenUsingDifferentAccessModes_ThenGoToExpectedStates(ProducerAccessMode
accessMode1, ProducerAccessMode accessMode2, ProducerState producerState1,
ProducerState producerState2)
{
//Arrange