This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 65cb69f  Partitioned Topic Consumer Support (#146)
65cb69f is described below

commit 65cb69f2f9c33bcd46d76db5738d57f8dd8046df
Author: Thomas O'Neill <[email protected]>
AuthorDate: Fri May 19 06:40:54 2023 -0400

    Partitioned Topic Consumer Support (#146)
    
    * feat(consumer): Add support for consuming partitioned topics
    
    * chore(deps): Add Nito.AsyncEx package reference
    
    * chore: Fix documentation, add new line to end of file
    
    ---------
    
    Co-authored-by: Thomas O'Neill <[email protected]>
---
 CHANGELOG.md                                       |   4 +
 samples/Consuming/Program.cs                       |   5 +-
 src/DotPulsar/ConsumerState.cs                     |   7 +-
 src/DotPulsar/DotPulsar.csproj                     |   1 +
 .../Exceptions/ConsumerNotActiveException.cs       |  21 +
 src/DotPulsar/Internal/Consumer.cs                 | 495 ++++++++++++++++-----
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  |   1 +
 .../Internal/{Consumer.cs => SubConsumer.cs}       | 187 ++++----
 src/DotPulsar/PulsarClient.cs                      |  33 +-
 tests/DotPulsar.Tests/ConsumerTests.cs             |  26 ++
 10 files changed, 541 insertions(+), 239 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index cfd3fab..f701407 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [UnReleased]
+
+- Support for consuming partitioned topics
+
 ## [2.12.0] - ?
 
 ### Added
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index dd71b35..9cbddbd 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -52,8 +52,9 @@ internal static class Program
         {
             await foreach (var message in consumer.Messages(cancellationToken))
             {
-                Console.WriteLine($"Received: {message.Value()}");
-                await consumer.Acknowledge(message, cancellationToken);
+                Console.WriteLine($"Received: {message?.Value()}");
+                if (message is not null)
+                    await consumer.Acknowledge(message, cancellationToken);
             }
         }
         catch (OperationCanceledException) { }
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
index d251e22..7dfd9ed 100644
--- a/src/DotPulsar/ConsumerState.cs
+++ b/src/DotPulsar/ConsumerState.cs
@@ -52,5 +52,10 @@ public enum ConsumerState : byte
     /// <summary>
     /// The consumer has unsubscribed. This is a final state.
     /// </summary>
-    Unsubscribed
+    Unsubscribed,
+
+    /// <summary>
+    /// When the topic is a partition topic and some of the SubConsumers are 
active.
+    /// </summary>
+    PartiallyActive
 }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index fb5b330..bcb70a1 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -25,6 +25,7 @@
     <PackageReference Include="HashDepot" Version="2.0.3" />
     <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="7.0.5" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" 
PrivateAssets="All" />
+    <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
     <PackageReference Include="protobuf-net" Version="3.2.16" />
     <PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
   </ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs 
b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
new file mode 100644
index 0000000..7a2c1db
--- /dev/null
+++ b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+    public sealed class ConsumerNotActiveException : DotPulsarException
+    {
+        public ConsumerNotActiveException(string message) : base(message) { }
+    }
+}
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 9cb3327..7558a13 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,27 +16,35 @@ namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
+using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
-using Microsoft.Extensions.ObjectPool;
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+using Nito.AsyncEx;
 
-public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
+public sealed class Consumer<TMessage> : IConsumer<TMessage>, IRegisterEvent
 {
-    private readonly Guid _correlationId;
-    private readonly IRegisterEvent _eventRegister;
-    private IConsumerChannel<TMessage> _channel;
-    private readonly ObjectPool<CommandAck> _commandAckPool;
     private readonly IExecute _executor;
-    private readonly IStateChanged<ConsumerState> _state;
-    private readonly IConsumerChannelFactory<TMessage> _factory;
+    private readonly StateManager<ConsumerState> _state;
+    private readonly ConsumerOptions<TMessage> _options;
+    private readonly ProcessManager _processManager;
+    private readonly IHandleException _exceptionHandler;
+    private readonly IConnectionPool _connectionPool;
+    private readonly CancellationTokenSource _cts;
+    private readonly ConcurrentDictionary<string, IConsumer<TMessage>> 
_consumers;
+    private readonly AsyncReaderWriterLock _lock;
+    private ConcurrentQueue<IMessage<TMessage>> _messagesQueue;
     private int _isDisposed;
+    private int _consumerCount;
+    private uint _numberOfPartitions;
+    private bool _isPartitioned;
     private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
@@ -44,29 +52,31 @@ public sealed class Consumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
     public string Topic { get; }
 
     public Consumer(
-        Guid correlationId,
         Uri serviceUrl,
-        string subscriptionName,
-        string topic,
-        IRegisterEvent eventRegister,
-        IConsumerChannel<TMessage> initialChannel,
-        IExecute executor,
-        IStateChanged<ConsumerState> state,
-        IConsumerChannelFactory<TMessage> factory)
-    {
-        _correlationId = correlationId;
+        ConsumerOptions<TMessage> options,
+        ProcessManager processManager,
+        IHandleException exceptionHandler,
+        IConnectionPool connectionPool)
+    {
+        Topic = options.Topic;
         ServiceUrl = serviceUrl;
-        SubscriptionName = subscriptionName;
-        Topic = topic;
-        _eventRegister = eventRegister;
-        _channel = initialChannel;
-        _executor = executor;
-        _state = state;
-        _factory = factory;
-        _commandAckPool = new DefaultObjectPool<CommandAck>(new 
DefaultPooledObjectPolicy<CommandAck>());
+        SubscriptionName = options.SubscriptionName;
+
+        _options = options;
+        _processManager = processManager;
+        _exceptionHandler = exceptionHandler;
+        _connectionPool = connectionPool;
+
+        _state = new StateManager<ConsumerState>(ConsumerState.Disconnected, 
ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+        _cts = new CancellationTokenSource();
+        _executor = new Executor(Guid.Empty, this, _exceptionHandler);
+        _consumers = new ConcurrentDictionary<string, IConsumer<TMessage>>();
+        _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
         _isDisposed = 0;
 
-        _eventRegister.Register(new ConsumerCreated(_correlationId));
+        _lock = new AsyncReaderWriterLock();
+
+        _ = Setup();
     }
 
     public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
@@ -86,140 +96,407 @@ public sealed class Consumer<TMessage> : 
IContainsChannel, IConsumer<TMessage>
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await DisposeChannel().ConfigureAwait(false);
+        _cts.Cancel();
+        _cts.Dispose();
+
+        _state.SetState(ConsumerState.Closed);
+
+        using (_lock.ReaderLock())
+        {
+            foreach (var consumer in _consumers.Values)
+            {
+                await consumer.DisposeAsync().ConfigureAwait(false);
+            }
+        }
     }
 
-    private async ValueTask DisposeChannel()
+    private async Task Setup()
     {
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
-    }
+        await Task.Yield();
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
-        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        try
+        {
+            await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+        }
+        catch (Exception exception)
+        {
+            if (_cts.IsCancellationRequested)
+                return;
 
-    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
+            _faultException = exception;
+            _state.SetState(ConsumerState.Faulted);
+        }
+    }
+    private async Task Monitor()
+    {
+        _numberOfPartitions = await GetNumberOfPartitions(Topic, 
_cts.Token).ConfigureAwait(false);
+        _isPartitioned = _numberOfPartitions != 0;
+        var monitoringTasks = new List<Task<ConsumerStateChanged>>();
 
-    public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
+        using (_lock.ReaderLock())
+        {
+            if (_isPartitioned)
+            {
+
+                for (var partition = 0; partition < _numberOfPartitions; 
++partition)
+                {
+                    var partitionedTopicName = 
getPartitonedTopicName(partition);
+
+                    var consumer = CreateSubConsumer(partitionedTopicName);
+                    _ = _consumers.TryAdd(partitionedTopicName, consumer);
+                    
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask());
+                }
+            }
+
+            else
+            {
+                var consumer = CreateSubConsumer(Topic);
+                _ = _consumers.TryAdd(Topic, consumer);
+                
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask());
+            }
+
+            Interlocked.Exchange(ref _consumerCount, monitoringTasks.Count);
+        }
+        var activeConsumers = 0;
+        while (true)
+        {
+            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+            for (var i = 0; i < monitoringTasks.Count; ++i)
+            {
+                var task = monitoringTasks[i];
+                if (!task.IsCompleted)
+                    continue;
+                var state = task.Result.ConsumerState;
+                switch (state)
+                {
+                    case ConsumerState.Active:
+                        ++activeConsumers;
+                        break;
+                    case ConsumerState.Disconnected:
+                        --activeConsumers;
+                        break;
+                    case ConsumerState.ReachedEndOfTopic:
+                        _state.SetState(ConsumerState.ReachedEndOfTopic);
+                        return;
+                    case ConsumerState.Faulted:
+                        _state.SetState(ConsumerState.Faulted);
+                        return;
+                    case ConsumerState.Unsubscribed:
+                        _state.SetState(ConsumerState.Unsubscribed);
+                        return;
+                }
+
+                monitoringTasks[i] = 
task.Result.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
+            }
+
+            if (activeConsumers == 0)
+                _state.SetState(ConsumerState.Disconnected);
+            else if (activeConsumers == monitoringTasks.Count)
+                _state.SetState(ConsumerState.Active);
+            else
+                _state.SetState(ConsumerState.PartiallyActive);
+        }
+    }
 
-    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
+    private SubConsumer<TMessage> CreateSubConsumer(string topic)
     {
-        var command = new CommandRedeliverUnacknowledgedMessages();
-        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
-        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
+        var correlationId = Guid.NewGuid();
+        var consumerName = _options.ConsumerName ?? 
$"Consumer-{correlationId:N}";
 
-    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
-        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
+        var subscribe = new CommandSubscribe
+        {
+            ConsumerName = consumerName,
+            InitialPosition = (CommandSubscribe.InitialPositionType) 
_options.InitialPosition,
+            PriorityLevel = _options.PriorityLevel,
+            ReadCompacted = _options.ReadCompacted,
+            ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
+            Subscription = _options.SubscriptionName,
+            Topic = topic,
+            Type = (CommandSubscribe.SubType) _options.SubscriptionType
+        };
+
+        foreach (var property in _options.SubscriptionProperties)
+        {
+            var keyValue = new KeyValue { Key = property.Key, Value = 
property.Value };
+            subscribe.SubscriptionProperties.Add(keyValue);
+        }
 
-    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+        var messagePrefetchCount = _options.MessagePrefetchCount;
+        var messageFactory = new MessageFactory<TMessage>(_options.Schema);
+        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+        var decompressorFactories = 
CompressionFactories.DecompressorFactories();
+
+        var factory = new ConsumerChannelFactory<TMessage>(correlationId, 
_processManager, _connectionPool, subscribe, messagePrefetchCount, 
batchHandler, messageFactory, decompressorFactories);
+        var stateManager = new 
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+        var initialChannel = new NotReadyChannel<TMessage>();
+        var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
+        var consumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_options.SubscriptionName, topic, _processManager, initialChannel, executor, 
stateManager, factory);
+        var process = new ConsumerProcess(correlationId, stateManager, 
consumer, _options.SubscriptionType == SubscriptionType.Failover);
+        _processManager.Add(process);
+        process.Start();
+        return consumer;
+    }
+
+    private async Task<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken)
     {
-        var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        var connection = await _connectionPool.FindConnectionForTopic(topic, 
cancellationToken).ConfigureAwait(false);
+        var commandPartitionedMetadata = new 
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
+        var response = await connection.Send(commandPartitionedMetadata, 
cancellationToken).ConfigureAwait(false);
+
+        
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
+
+        if (response.PartitionMetadataResponse.Response == 
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+            response.PartitionMetadataResponse.Throw();
+
+        return response.PartitionMetadataResponse.Partitions;
     }
 
-    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
+    private void ThrowIfDisposed()
+    {
+        if (_isDisposed != 0)
+            throw new ConsumerDisposedException(GetType().FullName!);
+    }
+    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken = default)
     {
-        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        var sourceTopic = Topic;
+        if (_isPartitioned)
+        {
+            sourceTopic = getPartitonedTopicName(messageId.Partition);
+        }
+        await _executor.Execute(() =>
+                {
+                    ThrowIfNotActive();
+
+                    using (_lock.ReaderLock())
+                    {
+                        return _consumers[sourceTopic].Acknowledge(messageId, 
cancellationToken);
+                    }
+                }, cancellationToken)
+                .ConfigureAwait(false);
     }
 
-    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
+    public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
     {
-        var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute(() =>
+            {
+                ThrowIfNotActive();
+
+                using (_lock.ReaderLock())
+                {
+                    var sourceTopic = Topic;
+                    if (_isPartitioned)
+                    {
+                        sourceTopic = 
getPartitonedTopicName(messageId.Partition);
+                    }
+                    return 
_consumers[sourceTopic].AcknowledgeCumulative(messageId, cancellationToken);
+                }
+            }, cancellationToken)
+            .ConfigureAwait(false);
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
+        ThrowIfDisposed();
+
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    private void Guard()
+    private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
     {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
+        ThrowIfNotActive();
 
-        if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
+        if (_isPartitioned)
+        {
+            throw new NotImplementedException("GetLastMessageId is not 
implemented for partitioned topics");
+        }
+        using (_lock.ReaderLock())
+        {
+            return await 
_consumers.First().Value.GetLastMessageId(cancellationToken).ConfigureAwait(false);
+        }
     }
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken = default)
     {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
-            await oldChannel.DisposeAsync().ConfigureAwait(false);
-
-        _channel = channel;
+        return await _executor.Execute(() => 
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask CloseChannel(CancellationToken cancellationToken)
-        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask ChannelFaulted(Exception exception)
+    private async ValueTask<IMessage<TMessage>> 
ReceiveMessage(CancellationToken cancellationToken)
     {
-        _faultException = exception;
-        await DisposeChannel().ConfigureAwait(false);
+        ThrowIfNotActive();
+
+        using (_lock.ReaderLock())
+        {
+            if (_messagesQueue.TryDequeue(out var message))
+            {
+                return message;
+            }
+            var cts = new CancellationTokenSource();
+            var linkedCts = 
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var done = false;
+
+                Task<IMessage<TMessage>>[] receiveTasks = 
_consumers.Values.Select(consumer => 
consumer.Receive(linkedCts.Token).AsTask()).ToArray();
+                await Task.WhenAny(receiveTasks).ConfigureAwait(false);
+
+                try
+                {
+                    receiveTasks.Where(t => t.IsCompleted).ToList().ForEach(t 
=>
+                        {
+                            if (t.Result == null)
+                            {
+                                return;
+                            }
+
+                            done = true;
+                            _messagesQueue.Enqueue(t.Result);
+                        }
+                    );
+                }
+                catch (Exception exception)
+                {
+                    if (linkedCts.IsCancellationRequested)
+                    {
+                        cts.Cancel();
+                    }
+                    else
+                    {
+                        throw exception;
+                    }
+                }
+                if (done)
+                {
+                    break;
+                }
+            }
+            cts.Cancel();
+            cts.Dispose();
+            _messagesQueue.TryDequeue(out var result);
+            return result;
+        }
     }
 
-    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
+    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken = default)
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute<ValueTask>(async () =>
+        {
+            ThrowIfNotActive();
+
+            using (_lock.ReaderLock())
+            {
+                var tasks = messageIds.Select(n =>
+                {
+                    var sourceTopic = Topic;
+                    if (_isPartitioned)
+                    {
+                        sourceTopic = getPartitonedTopicName(n.Partition);
+                    }
+                    return 
_consumers[getPartitonedTopicName(n.Partition)].RedeliverUnacknowledgedMessages(cancellationToken).AsTask();
+                });
+                await Task.WhenAll(tasks).ConfigureAwait(false);
+            }
+        }, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
+    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken = default)
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute<ValueTask>(async () =>
+        {
+            ThrowIfNotActive();
+
+            using (_lock.ReaderLock())
+            {
+                var tasks = _consumers.Values.Select(consumer =>
+                    
consumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask()
+                );
+                await Task.WhenAll(tasks).ConfigureAwait(false);
+            }
+        }, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken = default)
     {
-        Guard();
-        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute<ValueTask>(async () =>
+        {
+            ThrowIfNotActive();
+
+            using (_lock.ReaderLock())
+            {
+                if (messageId.Equals(null))
+                {
+                    throw new ArgumentException("Illegal messageId cannot be 
null");
+                }
+
+                var tasks = _consumers.Values.Select(consumer =>
+                    consumer.Seek(messageId, cancellationToken).AsTask()
+                );
+                await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+                _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+            }
+        }, cancellationToken).ConfigureAwait(false);
     }
 
-    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
+    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken = default)
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute<ValueTask>(async () =>
+        {
+            ThrowIfNotActive();
+
+            using (_lock.ReaderLock())
+            {
+                var tasks = _consumers.Values.Select(consumer =>
+                    consumer.Seek(publishTime, cancellationToken).AsTask()
+                );
+                await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
+                _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+            }
+        }, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
+    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
     {
-        Guard();
-        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute<ValueTask>(async () =>
+        {
+            ThrowIfNotActive();
+
+            using (_lock.ReaderLock())
+            {
+                var tasks = _consumers.Values.Select(consumer =>
+                    consumer.Unsubscribe(cancellationToken).AsTask()
+                );
+                await Task.WhenAll(tasks).ConfigureAwait(false);
+            }
+        }, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask InternalUnsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
+    private string getPartitonedTopicName(int partitionNumber)
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        return $"{Topic}-partition-{partitionNumber}";
     }
 
-    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
+    private void ThrowIfNotActive()
     {
-        var commandAck = _commandAckPool.Get();
-        commandAck.Type = ackType;
-        if (commandAck.MessageIds.Count == 0)
-            commandAck.MessageIds.Add(messageId.ToMessageIdData());
-        else
-            commandAck.MessageIds[0].MapFrom(messageId);
-
-        try
-        {
-            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-        finally
-        {
-            _commandAckPool.Return(commandAck);
-        }
+        if (_state.CurrentState != ConsumerState.Active)
+            throw new ConsumerNotActiveException("The consumer is not yet 
activated.");
     }
+
+    public void Register(IEvent @event) { }
 }
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index edb90fe..727d8a7 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,6 +44,7 @@ public sealed class DefaultExceptionHandler : IHandleException
             ServiceNotReadyException _ => FaultAction.Retry,
             MetadataException _ => FaultAction.Rethrow,
             ConsumerNotFoundException _ => FaultAction.Retry,
+            ConsumerNotActiveException _ => FaultAction.Retry,
             ConsumerBusyException _ => FaultAction.Retry,
             ProducerBusyException _ => FaultAction.Retry,
             ProducerFencedException _ => FaultAction.Rethrow,
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
similarity index 86%
copy from src/DotPulsar/Internal/Consumer.cs
copy to src/DotPulsar/Internal/SubConsumer.cs
index 9cb3327..bbc9bd3 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/SubConsumer.cs
@@ -27,23 +27,23 @@ using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 
-public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
+public sealed class SubConsumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
 {
     private readonly Guid _correlationId;
     private readonly IRegisterEvent _eventRegister;
     private IConsumerChannel<TMessage> _channel;
-    private readonly ObjectPool<CommandAck> _commandAckPool;
     private readonly IExecute _executor;
     private readonly IStateChanged<ConsumerState> _state;
     private readonly IConsumerChannelFactory<TMessage> _factory;
-    private int _isDisposed;
     private Exception? _faultException;
+    private readonly ObjectPool<CommandAck> _commandAckPool;
 
     public Uri ServiceUrl { get; }
     public string SubscriptionName { get; }
     public string Topic { get; }
+    private int _isDisposed;
 
-    public Consumer(
+    public SubConsumer(
         Guid correlationId,
         Uri serviceUrl,
         string subscriptionName,
@@ -52,7 +52,8 @@ public sealed class Consumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
         IConsumerChannel<TMessage> initialChannel,
         IExecute executor,
         IStateChanged<ConsumerState> state,
-        IConsumerChannelFactory<TMessage> factory)
+        IConsumerChannelFactory<TMessage> factory
+    )
     {
         _correlationId = correlationId;
         ServiceUrl = serviceUrl;
@@ -63,127 +64,129 @@ public sealed class Consumer<TMessage> : 
IContainsChannel, IConsumer<TMessage>
         _executor = executor;
         _state = state;
         _factory = factory;
-        _commandAckPool = new DefaultObjectPool<CommandAck>(new 
DefaultPooledObjectPolicy<CommandAck>());
         _isDisposed = 0;
+        _commandAckPool = new DefaultObjectPool<CommandAck>(new 
DefaultPooledObjectPolicy<CommandAck>());
 
         _eventRegister.Register(new ConsumerCreated(_correlationId));
     }
 
-    public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState 
state, CancellationToken cancellationToken)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
+    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
+          => await InternalAcknowledge(messageId, 
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
 
-    public bool IsFinalState(ConsumerState state)
-        => _state.IsFinalState(state);
 
-    public async ValueTask DisposeAsync()
+    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
     {
-        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
-            return;
+        var commandAck = _commandAckPool.Get();
+        commandAck.Type = ackType;
+        if (commandAck.MessageIds.Count == 0)
+            commandAck.MessageIds.Add(messageId.ToMessageIdData());
+        else
+            commandAck.MessageIds[0].MapFrom(messageId);
 
-        _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await DisposeChannel().ConfigureAwait(false);
+        try
+        {
+            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+        finally
+        {
+            _commandAckPool.Return(commandAck);
+        }
     }
 
-    private async ValueTask DisposeChannel()
+    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
     {
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
-        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
-
     public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
         => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
 
-    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
+    public async ValueTask ChannelFaulted(Exception exception)
     {
-        var command = new CommandRedeliverUnacknowledgedMessages();
-        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
-        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        _faultException = exception;
+        await DisposeChannel().ConfigureAwait(false);
     }
 
-    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
-        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
+    public async ValueTask CloseChannel(CancellationToken cancellationToken)
+        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
 
-    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+    public async ValueTask DisposeAsync()
     {
-        var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-    }
+        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+            return;
 
-    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
-    {
-        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        _eventRegister.Register(new ConsumerDisposed(_correlationId));
+        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+        await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
+    public async Task EstablishNewChannel(CancellationToken cancellationToken)
     {
-        var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+
+        var oldChannel = _channel;
+        if (oldChannel is not null)
+            await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+        _channel = channel;
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
+        Guard();
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        return await _channel.Send(getLastMessageId, 
cancellationToken).ConfigureAwait(false);
     }
 
-    private void Guard()
-    {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
+    public bool IsFinalState()
+        => _state.IsFinalState();
 
-        if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
-    }
+    public bool IsFinalState(ConsumerState state)
+        => _state.IsFinalState(state);
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
-    {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+    public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState 
state, CancellationToken cancellationToken = default)
+        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
-            await oldChannel.DisposeAsync().ConfigureAwait(false);
 
-        _channel = channel;
-    }
+    public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken = default)
+        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
 
-    public async ValueTask CloseChannel(CancellationToken cancellationToken)
-        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
+        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
 
-    public async ValueTask ChannelFaulted(Exception exception)
+    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
     {
-        _faultException = exception;
-        await DisposeChannel().ConfigureAwait(false);
+        Guard();
+        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
+    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        var command = new CommandRedeliverUnacknowledgedMessages();
+        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
+        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
+    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
+        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
+
     private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
     {
         Guard();
         await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken = default)
     {
-        Guard();
-        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+    }
+
+    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken = default)
+    {
+        var seek = new CommandSeek { MessagePublishTime = publishTime };
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
@@ -192,34 +195,28 @@ public sealed class Consumer<TMessage> : 
IContainsChannel, IConsumer<TMessage>
         await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
+    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
     {
         Guard();
-        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+        var unsubscribe = new CommandUnsubscribe();
+        await _executor.Execute(() => Unsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask InternalUnsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
+    private async ValueTask Unsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
+        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+
+    private void Guard()
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        if (_isDisposed != 0)
+            throw new ConsumerDisposedException(GetType().FullName!);
+
+        if (_faultException is not null)
+            throw new ConsumerFaultedException(_faultException);
     }
 
-    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
+    private async ValueTask DisposeChannel()
     {
-        var commandAck = _commandAckPool.Get();
-        commandAck.Type = ackType;
-        if (commandAck.MessageIds.Count == 0)
-            commandAck.MessageIds.Add(messageId.ToMessageIdData());
-        else
-            commandAck.MessageIds[0].MapFrom(messageId);
-
-        try
-        {
-            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-        finally
-        {
-            _commandAckPool.Return(commandAck);
-        }
+        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+        await _channel.DisposeAsync().ConfigureAwait(false);
     }
 }
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 66284ef..14085f1 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -89,43 +89,12 @@ public sealed class PulsarClient : IPulsarClient
     {
         ThrowIfDisposed();
 
-        var correlationId = Guid.NewGuid();
-        var consumerName = options.ConsumerName ?? 
$"Consumer-{correlationId:N}";
-        var subscribe = new CommandSubscribe
-        {
-            ConsumerName = consumerName,
-            InitialPosition = (CommandSubscribe.InitialPositionType) 
options.InitialPosition,
-            PriorityLevel = options.PriorityLevel,
-            ReadCompacted = options.ReadCompacted,
-            ReplicateSubscriptionState = options.ReplicateSubscriptionState,
-            Subscription = options.SubscriptionName,
-            Topic = options.Topic,
-            Type = (CommandSubscribe.SubType) options.SubscriptionType
-        };
+        var consumer = new Consumer<TMessage>(ServiceUrl, options, 
_processManager, _exceptionHandler, _connectionPool);
 
-        foreach (var property in options.SubscriptionProperties)
-        {
-            var keyValue = new KeyValue { Key = property.Key, Value = 
property.Value };
-            subscribe.SubscriptionProperties.Add(keyValue);
-        }
-
-        var messagePrefetchCount = options.MessagePrefetchCount;
-        var messageFactory = new MessageFactory<TMessage>(options.Schema);
-        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
-        var decompressorFactories = 
CompressionFactories.DecompressorFactories();
-        var factory = new ConsumerChannelFactory<TMessage>(correlationId, 
_processManager, _connectionPool, subscribe, messagePrefetchCount, 
batchHandler, messageFactory, decompressorFactories);
-        var stateManager = new 
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-        var initialChannel = new NotReadyChannel<TMessage>();
-        var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
-        var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, 
options.SubscriptionName, options.Topic, _processManager, initialChannel, 
executor, stateManager, factory);
         if (options.StateChangedHandler is not null)
             _ = StateMonitor.MonitorConsumer(consumer, 
options.StateChangedHandler);
-        var process = new ConsumerProcess(correlationId, stateManager, 
consumer, options.SubscriptionType == SubscriptionType.Failover);
-        _processManager.Add(process);
-        process.Start();
         return consumer;
     }
-
     /// <summary>
     /// Create a reader.
     /// </summary>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs 
b/tests/DotPulsar.Tests/ConsumerTests.cs
index fbf43c8..bbb23f7 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -70,6 +70,32 @@ public class ConsumerTests
         consumed.Should().BeEquivalentTo(produced);
     }
 
+    [Fact]
+    public async Task 
PartitionConsume_WhenGetLastMessageId_ThenShouldThrowException()
+    {
+        //Arrange
+        var testRunId = Guid.NewGuid().ToString("N");
+        const int partitions = 3;
+        var topicName = $"consumer-tests-{testRunId}";
+
+        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
+
+        //Act 
+        await using var client = PulsarClient.Builder()
+            .ServiceUrl(_fixture.ServiceUrl)
+            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+            .Build();
+
+        var consumer = client.NewConsumer(Schema.ByteArray)
+            .ConsumerName($"consumer-{testRunId}")
+            .InitialPosition(SubscriptionInitialPosition.Earliest)
+            .SubscriptionName($"subscription-{testRunId}")
+            .Topic(topicName)
+            .Create();
+        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
+        Assert.ThrowsAsync<NotImplementedException>(async () => await 
consumer.GetLastMessageId(cts.Token));
+    }
+
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages, 
CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];

Reply via email to