This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eb8a54c  Spring cleaning (#161)
eb8a54c is described below

commit eb8a54c87391dfb77c2b3444be2df5696a06b456
Author: entvex <[email protected]>
AuthorDate: Fri Jun 2 09:17:01 2023 +0200

    Spring cleaning (#161)
    
    * spring cleaning
    
    * spring cleaning
    
    * reformat and cleanup code profile: DotPulsar: Full Cleanup
    
    ---------
    
    Co-authored-by: David Jensen <[email protected]>
---
 CHANGELOG.md                                       |   2 -
 samples/Consuming/Program.cs                       |   5 +-
 samples/Processing/LoggerExtensions.cs             |   2 +-
 samples/Processing/Worker.cs                       |  10 +-
 src/DotPulsar/ConsumerState.cs                     |   7 +-
 src/DotPulsar/DotPulsar.csproj                     |   1 -
 .../Exceptions/ConsumerNotActiveException.cs       |  21 -
 src/DotPulsar/Internal/AsyncQueueWithCursor.cs     |   2 +-
 src/DotPulsar/Internal/Consumer.cs                 | 495 +++++----------------
 src/DotPulsar/Internal/ConsumerProcess.cs          |   2 +-
 src/DotPulsar/Internal/DefaultExceptionHandler.cs  |   1 -
 src/DotPulsar/Internal/Producer.cs                 |   4 +-
 src/DotPulsar/Internal/ProducerBuilder.cs          |   1 -
 src/DotPulsar/Internal/ProducerProcess.cs          |   2 +-
 src/DotPulsar/Internal/ReaderProcess.cs            |   2 +-
 src/DotPulsar/Internal/SubConsumer.cs              | 222 ---------
 src/DotPulsar/PulsarClient.cs                      |  33 +-
 tests/DotPulsar.Tests/ConsumerTests.cs             |  26 --
 tests/DotPulsar.Tests/ProducerTests.cs             |   1 -
 19 files changed, 156 insertions(+), 683 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d84fc4c..96ab6c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,8 +6,6 @@ The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/)
 
 ## [UnReleased]
 
-- Support for consuming partitioned topics
-
 ### Fixed
 
 - Fixed issue preventing reader to correctly go into `Faulted` state.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 9cbddbd..dd71b35 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -52,9 +52,8 @@ internal static class Program
         {
             await foreach (var message in consumer.Messages(cancellationToken))
             {
-                Console.WriteLine($"Received: {message?.Value()}");
-                if (message is not null)
-                    await consumer.Acknowledge(message, cancellationToken);
+                Console.WriteLine($"Received: {message.Value()}");
+                await consumer.Acknowledge(message, cancellationToken);
             }
         }
         catch (OperationCanceledException) { }
diff --git a/samples/Processing/LoggerExtensions.cs 
b/samples/Processing/LoggerExtensions.cs
index e728b12..bc5bf45 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Processing/LoggerExtensions.cs
@@ -74,7 +74,7 @@ public static partial class LoggerExtensions
     public static ValueTask ConsumerRegainedConnection(this ILogger logger, 
IConsumer consumer, ConsumerState state, string ticketId, CancellationToken 
cancellationToken)
     {
         logger.ConsumerRegainedConnection(consumer.Topic);
-        return ValueTask.CompletedTask;  // If an alert has been opened, this 
is where we can close it again
+        return ValueTask.CompletedTask; // If an alert has been opened, this 
is where we can close it again
     }
 
     [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has regained the connection")]
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index e471b29..d49bc64 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -28,7 +28,7 @@ public class Worker : BackgroundService
     {
         await using var client = PulsarClient.Builder()
             .ExceptionHandler(_logger.PulsarClientException) // Optional
-            .Build(); // Connecting to pulsar://localhost:6650
+            .Build();                                        // Connecting to 
pulsar://localhost:6650
 
         await using var consumer = client.NewConsumer(Schema.String)
             .StateChangedHandler(_logger.ConsumerChangedState) // Optional
@@ -36,10 +36,10 @@ public class Worker : BackgroundService
             .Topic("persistent://public/default/mytopic")
             .Create();
 
-        _ = consumer.DelayedStateMonitor( // Recommended way of ignoring the 
short disconnects expected when working with a distributed system
-            ConsumerState.Active, // Operational state
-            TimeSpan.FromSeconds(5), // The amount of time allowed in 
non-operational state before we act
-            _logger.ConsumerLostConnection, // Invoked if we are NOT back in 
operational state after 5 seconds
+        _ = consumer.DelayedStateMonitor(       // Recommended way of ignoring 
the short disconnects expected when working with a distributed system
+            ConsumerState.Active,               // Operational state
+            TimeSpan.FromSeconds(5),            // The amount of time allowed 
in non-operational state before we act
+            _logger.ConsumerLostConnection,     // Invoked if we are NOT back 
in operational state after 5 seconds
             _logger.ConsumerRegainedConnection, // Invoked when we are in 
operational state again
             cancellationToken);
 
diff --git a/src/DotPulsar/ConsumerState.cs b/src/DotPulsar/ConsumerState.cs
index 7dfd9ed..d251e22 100644
--- a/src/DotPulsar/ConsumerState.cs
+++ b/src/DotPulsar/ConsumerState.cs
@@ -52,10 +52,5 @@ public enum ConsumerState : byte
     /// <summary>
     /// The consumer has unsubscribed. This is a final state.
     /// </summary>
-    Unsubscribed,
-
-    /// <summary>
-    /// When the topic is a partition topic and some of the SubConsumers are 
active.
-    /// </summary>
-    PartiallyActive
+    Unsubscribed
 }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0675118..64099f1 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -25,7 +25,6 @@
     <PackageReference Include="HashDepot" Version="2.0.3" />
     <PackageReference Include="Microsoft.Extensions.ObjectPool" 
Version="7.0.5" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" 
PrivateAssets="All" />
-    <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
     <PackageReference Include="protobuf-net" Version="3.2.16" />
     <PackageReference Include="System.IO.Pipelines" Version="7.0.0" />
   </ItemGroup>
diff --git a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs 
b/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
deleted file mode 100644
index 7a2c1db..0000000
--- a/src/DotPulsar/Exceptions/ConsumerNotActiveException.cs
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Exceptions
-{
-    public sealed class ConsumerNotActiveException : DotPulsarException
-    {
-        public ConsumerNotActiveException(string message) : base(message) { }
-    }
-}
diff --git a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs 
b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
index f7f3824..4855a84 100644
--- a/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
+++ b/src/DotPulsar/Internal/AsyncQueueWithCursor.cs
@@ -179,7 +179,7 @@ public sealed class AsyncQueueWithCursor<T> : 
IAsyncDisposable where T : IDispos
         }
         finally
         {
-            bool shouldThrow = _cursorNextItemTcs is not null && 
_cursorNextItemTcs.Task.IsCanceled;
+            var shouldThrow = _cursorNextItemTcs is not null && 
_cursorNextItemTcs.Task.IsCanceled;
 
             lock (_queue)
             {
diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index 7558a13..9cb3327 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -16,35 +16,27 @@ namespace DotPulsar.Internal;
 
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
-using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Compression;
+using DotPulsar.Internal.Events;
 using DotPulsar.Internal.Extensions;
 using DotPulsar.Internal.PulsarApi;
+using Microsoft.Extensions.ObjectPool;
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
-using Nito.AsyncEx;
 
-public sealed class Consumer<TMessage> : IConsumer<TMessage>, IRegisterEvent
+public sealed class Consumer<TMessage> : IContainsChannel, IConsumer<TMessage>
 {
+    private readonly Guid _correlationId;
+    private readonly IRegisterEvent _eventRegister;
+    private IConsumerChannel<TMessage> _channel;
+    private readonly ObjectPool<CommandAck> _commandAckPool;
     private readonly IExecute _executor;
-    private readonly StateManager<ConsumerState> _state;
-    private readonly ConsumerOptions<TMessage> _options;
-    private readonly ProcessManager _processManager;
-    private readonly IHandleException _exceptionHandler;
-    private readonly IConnectionPool _connectionPool;
-    private readonly CancellationTokenSource _cts;
-    private readonly ConcurrentDictionary<string, IConsumer<TMessage>> 
_consumers;
-    private readonly AsyncReaderWriterLock _lock;
-    private ConcurrentQueue<IMessage<TMessage>> _messagesQueue;
+    private readonly IStateChanged<ConsumerState> _state;
+    private readonly IConsumerChannelFactory<TMessage> _factory;
     private int _isDisposed;
-    private int _consumerCount;
-    private uint _numberOfPartitions;
-    private bool _isPartitioned;
     private Exception? _faultException;
 
     public Uri ServiceUrl { get; }
@@ -52,31 +44,29 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>, IRegisterEvent
     public string Topic { get; }
 
     public Consumer(
+        Guid correlationId,
         Uri serviceUrl,
-        ConsumerOptions<TMessage> options,
-        ProcessManager processManager,
-        IHandleException exceptionHandler,
-        IConnectionPool connectionPool)
-    {
-        Topic = options.Topic;
+        string subscriptionName,
+        string topic,
+        IRegisterEvent eventRegister,
+        IConsumerChannel<TMessage> initialChannel,
+        IExecute executor,
+        IStateChanged<ConsumerState> state,
+        IConsumerChannelFactory<TMessage> factory)
+    {
+        _correlationId = correlationId;
         ServiceUrl = serviceUrl;
-        SubscriptionName = options.SubscriptionName;
-
-        _options = options;
-        _processManager = processManager;
-        _exceptionHandler = exceptionHandler;
-        _connectionPool = connectionPool;
-
-        _state = new StateManager<ConsumerState>(ConsumerState.Disconnected, 
ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-        _cts = new CancellationTokenSource();
-        _executor = new Executor(Guid.Empty, this, _exceptionHandler);
-        _consumers = new ConcurrentDictionary<string, IConsumer<TMessage>>();
-        _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
+        SubscriptionName = subscriptionName;
+        Topic = topic;
+        _eventRegister = eventRegister;
+        _channel = initialChannel;
+        _executor = executor;
+        _state = state;
+        _factory = factory;
+        _commandAckPool = new DefaultObjectPool<CommandAck>(new 
DefaultPooledObjectPolicy<CommandAck>());
         _isDisposed = 0;
 
-        _lock = new AsyncReaderWriterLock();
-
-        _ = Setup();
+        _eventRegister.Register(new ConsumerCreated(_correlationId));
     }
 
     public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken)
@@ -96,407 +86,140 @@ public sealed class Consumer<TMessage> : 
IConsumer<TMessage>, IRegisterEvent
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        _cts.Cancel();
-        _cts.Dispose();
-
-        _state.SetState(ConsumerState.Closed);
-
-        using (_lock.ReaderLock())
-        {
-            foreach (var consumer in _consumers.Values)
-            {
-                await consumer.DisposeAsync().ConfigureAwait(false);
-            }
-        }
+        _eventRegister.Register(new ConsumerDisposed(_correlationId));
+        await DisposeChannel().ConfigureAwait(false);
     }
 
-    private async Task Setup()
+    private async ValueTask DisposeChannel()
     {
-        await Task.Yield();
-
-        try
-        {
-            await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
-        }
-        catch (Exception exception)
-        {
-            if (_cts.IsCancellationRequested)
-                return;
-
-            _faultException = exception;
-            _state.SetState(ConsumerState.Faulted);
-        }
-    }
-    private async Task Monitor()
-    {
-        _numberOfPartitions = await GetNumberOfPartitions(Topic, 
_cts.Token).ConfigureAwait(false);
-        _isPartitioned = _numberOfPartitions != 0;
-        var monitoringTasks = new List<Task<ConsumerStateChanged>>();
-
-        using (_lock.ReaderLock())
-        {
-            if (_isPartitioned)
-            {
-
-                for (var partition = 0; partition < _numberOfPartitions; 
++partition)
-                {
-                    var partitionedTopicName = 
getPartitonedTopicName(partition);
-
-                    var consumer = CreateSubConsumer(partitionedTopicName);
-                    _ = _consumers.TryAdd(partitionedTopicName, consumer);
-                    
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask());
-                }
-            }
-
-            else
-            {
-                var consumer = CreateSubConsumer(Topic);
-                _ = _consumers.TryAdd(Topic, consumer);
-                
monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, 
_cts.Token).AsTask());
-            }
-
-            Interlocked.Exchange(ref _consumerCount, monitoringTasks.Count);
-        }
-        var activeConsumers = 0;
-        while (true)
-        {
-            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
-
-            for (var i = 0; i < monitoringTasks.Count; ++i)
-            {
-                var task = monitoringTasks[i];
-                if (!task.IsCompleted)
-                    continue;
-                var state = task.Result.ConsumerState;
-                switch (state)
-                {
-                    case ConsumerState.Active:
-                        ++activeConsumers;
-                        break;
-                    case ConsumerState.Disconnected:
-                        --activeConsumers;
-                        break;
-                    case ConsumerState.ReachedEndOfTopic:
-                        _state.SetState(ConsumerState.ReachedEndOfTopic);
-                        return;
-                    case ConsumerState.Faulted:
-                        _state.SetState(ConsumerState.Faulted);
-                        return;
-                    case ConsumerState.Unsubscribed:
-                        _state.SetState(ConsumerState.Unsubscribed);
-                        return;
-                }
-
-                monitoringTasks[i] = 
task.Result.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
-            }
-
-            if (activeConsumers == 0)
-                _state.SetState(ConsumerState.Disconnected);
-            else if (activeConsumers == monitoringTasks.Count)
-                _state.SetState(ConsumerState.Active);
-            else
-                _state.SetState(ConsumerState.PartiallyActive);
-        }
+        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
+        await _channel.DisposeAsync().ConfigureAwait(false);
     }
 
-    private SubConsumer<TMessage> CreateSubConsumer(string topic)
-    {
-        var correlationId = Guid.NewGuid();
-        var consumerName = _options.ConsumerName ?? 
$"Consumer-{correlationId:N}";
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
+        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
 
-        var subscribe = new CommandSubscribe
-        {
-            ConsumerName = consumerName,
-            InitialPosition = (CommandSubscribe.InitialPositionType) 
_options.InitialPosition,
-            PriorityLevel = _options.PriorityLevel,
-            ReadCompacted = _options.ReadCompacted,
-            ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
-            Subscription = _options.SubscriptionName,
-            Topic = topic,
-            Type = (CommandSubscribe.SubType) _options.SubscriptionType
-        };
-
-        foreach (var property in _options.SubscriptionProperties)
-        {
-            var keyValue = new KeyValue { Key = property.Key, Value = 
property.Value };
-            subscribe.SubscriptionProperties.Add(keyValue);
-        }
+    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
+        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
 
-        var messagePrefetchCount = _options.MessagePrefetchCount;
-        var messageFactory = new MessageFactory<TMessage>(_options.Schema);
-        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
-        var decompressorFactories = 
CompressionFactories.DecompressorFactories();
-
-        var factory = new ConsumerChannelFactory<TMessage>(correlationId, 
_processManager, _connectionPool, subscribe, messagePrefetchCount, 
batchHandler, messageFactory, decompressorFactories);
-        var stateManager = new 
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-        var initialChannel = new NotReadyChannel<TMessage>();
-        var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
-        var consumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_options.SubscriptionName, topic, _processManager, initialChannel, executor, 
stateManager, factory);
-        var process = new ConsumerProcess(correlationId, stateManager, 
consumer, _options.SubscriptionType == SubscriptionType.Failover);
-        _processManager.Add(process);
-        process.Start();
-        return consumer;
-    }
+    public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
+        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
 
-    private async Task<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken)
+    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
     {
-        var connection = await _connectionPool.FindConnectionForTopic(topic, 
cancellationToken).ConfigureAwait(false);
-        var commandPartitionedMetadata = new 
PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
-        var response = await connection.Send(commandPartitionedMetadata, 
cancellationToken).ConfigureAwait(false);
-
-        
response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
-
-        if (response.PartitionMetadataResponse.Response == 
PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
-            response.PartitionMetadataResponse.Throw();
-
-        return response.PartitionMetadataResponse.Partitions;
+        var command = new CommandRedeliverUnacknowledgedMessages();
+        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
+        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    private void ThrowIfDisposed()
+    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
+        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
+
+    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
     {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
+        var unsubscribe = new CommandUnsubscribe();
+        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
     }
-    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken = default)
-    {
-        ThrowIfDisposed();
 
-        var sourceTopic = Topic;
-        if (_isPartitioned)
-        {
-            sourceTopic = getPartitonedTopicName(messageId.Partition);
-        }
-        await _executor.Execute(() =>
-                {
-                    ThrowIfNotActive();
-
-                    using (_lock.ReaderLock())
-                    {
-                        return _consumers[sourceTopic].Acknowledge(messageId, 
cancellationToken);
-                    }
-                }, cancellationToken)
-                .ConfigureAwait(false);
+    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
+    {
+        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
+    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute(() =>
-            {
-                ThrowIfNotActive();
-
-                using (_lock.ReaderLock())
-                {
-                    var sourceTopic = Topic;
-                    if (_isPartitioned)
-                    {
-                        sourceTopic = 
getPartitonedTopicName(messageId.Partition);
-                    }
-                    return 
_consumers[sourceTopic].AcknowledgeCumulative(messageId, cancellationToken);
-                }
-            }, cancellationToken)
-            .ConfigureAwait(false);
+        var seek = new CommandSeek { MessagePublishTime = publishTime };
+        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
GetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    private async ValueTask<MessageId> 
GetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    private void Guard()
     {
-        ThrowIfNotActive();
+        if (_isDisposed != 0)
+            throw new ConsumerDisposedException(GetType().FullName!);
 
-        if (_isPartitioned)
-        {
-            throw new NotImplementedException("GetLastMessageId is not 
implemented for partitioned topics");
-        }
-        using (_lock.ReaderLock())
-        {
-            return await 
_consumers.First().Value.GetLastMessageId(cancellationToken).ConfigureAwait(false);
-        }
+        if (_faultException is not null)
+            throw new ConsumerFaultedException(_faultException);
     }
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken = default)
+    public async Task EstablishNewChannel(CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
+        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
 
-        return await _executor.Execute(() => 
ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
+        var oldChannel = _channel;
+        if (oldChannel is not null)
+            await oldChannel.DisposeAsync().ConfigureAwait(false);
+
+        _channel = channel;
     }
 
-    private async ValueTask<IMessage<TMessage>> 
ReceiveMessage(CancellationToken cancellationToken)
-    {
-        ThrowIfNotActive();
+    public async ValueTask CloseChannel(CancellationToken cancellationToken)
+        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
 
-        using (_lock.ReaderLock())
-        {
-            if (_messagesQueue.TryDequeue(out var message))
-            {
-                return message;
-            }
-            var cts = new CancellationTokenSource();
-            var linkedCts = 
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
-            while (!cancellationToken.IsCancellationRequested)
-            {
-                var done = false;
-
-                Task<IMessage<TMessage>>[] receiveTasks = 
_consumers.Values.Select(consumer => 
consumer.Receive(linkedCts.Token).AsTask()).ToArray();
-                await Task.WhenAny(receiveTasks).ConfigureAwait(false);
-
-                try
-                {
-                    receiveTasks.Where(t => t.IsCompleted).ToList().ForEach(t 
=>
-                        {
-                            if (t.Result == null)
-                            {
-                                return;
-                            }
-
-                            done = true;
-                            _messagesQueue.Enqueue(t.Result);
-                        }
-                    );
-                }
-                catch (Exception exception)
-                {
-                    if (linkedCts.IsCancellationRequested)
-                    {
-                        cts.Cancel();
-                    }
-                    else
-                    {
-                        throw exception;
-                    }
-                }
-                if (done)
-                {
-                    break;
-                }
-            }
-            cts.Cancel();
-            cts.Dispose();
-            _messagesQueue.TryDequeue(out var result);
-            return result;
-        }
+    public async ValueTask ChannelFaulted(Exception exception)
+    {
+        _faultException = exception;
+        await DisposeChannel().ConfigureAwait(false);
     }
 
-    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken = default)
+    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute<ValueTask>(async () =>
-        {
-            ThrowIfNotActive();
-
-            using (_lock.ReaderLock())
-            {
-                var tasks = messageIds.Select(n =>
-                {
-                    var sourceTopic = Topic;
-                    if (_isPartitioned)
-                    {
-                        sourceTopic = getPartitonedTopicName(n.Partition);
-                    }
-                    return 
_consumers[getPartitonedTopicName(n.Partition)].RedeliverUnacknowledgedMessages(cancellationToken).AsTask();
-                });
-                await Task.WhenAll(tasks).ConfigureAwait(false);
-            }
-        }, cancellationToken).ConfigureAwait(false);
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken = default)
+    private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute<ValueTask>(async () =>
-        {
-            ThrowIfNotActive();
-
-            using (_lock.ReaderLock())
-            {
-                var tasks = _consumers.Values.Select(consumer =>
-                    
consumer.RedeliverUnacknowledgedMessages(cancellationToken).AsTask()
-                );
-                await Task.WhenAll(tasks).ConfigureAwait(false);
-            }
-        }, cancellationToken).ConfigureAwait(false);
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken = default)
+    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute<ValueTask>(async () =>
-        {
-            ThrowIfNotActive();
-
-            using (_lock.ReaderLock())
-            {
-                if (messageId.Equals(null))
-                {
-                    throw new ArgumentException("Illegal messageId cannot be 
null");
-                }
-
-                var tasks = _consumers.Values.Select(consumer =>
-                    consumer.Seek(messageId, cancellationToken).AsTask()
-                );
-                await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
-                _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
-            }
-        }, cancellationToken).ConfigureAwait(false);
+        Guard();
+        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken = default)
+    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute<ValueTask>(async () =>
-        {
-            ThrowIfNotActive();
-
-            using (_lock.ReaderLock())
-            {
-                var tasks = _consumers.Values.Select(consumer =>
-                    consumer.Seek(publishTime, cancellationToken).AsTask()
-                );
-                await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);
-                _messagesQueue = new ConcurrentQueue<IMessage<TMessage>>();
-            }
-        }, cancellationToken).ConfigureAwait(false);
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
     {
-        ThrowIfDisposed();
-
-        await _executor.Execute<ValueTask>(async () =>
-        {
-            ThrowIfNotActive();
-
-            using (_lock.ReaderLock())
-            {
-                var tasks = _consumers.Values.Select(consumer =>
-                    consumer.Unsubscribe(cancellationToken).AsTask()
-                );
-                await Task.WhenAll(tasks).ConfigureAwait(false);
-            }
-        }, cancellationToken).ConfigureAwait(false);
+        Guard();
+        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
     }
 
-    private string getPartitonedTopicName(int partitionNumber)
+    private async ValueTask InternalUnsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
     {
-        return $"{Topic}-partition-{partitionNumber}";
+        Guard();
+        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
     }
 
-    private void ThrowIfNotActive()
+    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
     {
-        if (_state.CurrentState != ConsumerState.Active)
-            throw new ConsumerNotActiveException("The consumer is not yet 
activated.");
-    }
+        var commandAck = _commandAckPool.Get();
+        commandAck.Type = ackType;
+        if (commandAck.MessageIds.Count == 0)
+            commandAck.MessageIds.Add(messageId.ToMessageIdData());
+        else
+            commandAck.MessageIds[0].MapFrom(messageId);
 
-    public void Register(IEvent @event) { }
+        try
+        {
+            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        }
+        finally
+        {
+            _commandAckPool.Return(commandAck);
+        }
+    }
 }
diff --git a/src/DotPulsar/Internal/ConsumerProcess.cs 
b/src/DotPulsar/Internal/ConsumerProcess.cs
index aa0f9f6..6c3584a 100644
--- a/src/DotPulsar/Internal/ConsumerProcess.cs
+++ b/src/DotPulsar/Internal/ConsumerProcess.cs
@@ -51,7 +51,7 @@ public sealed class ConsumerProcess : Process
         {
             var formerState = _stateManager.SetState(ConsumerState.Faulted);
             if (formerState != ConsumerState.Faulted)
-                ActionQueue.Enqueue(async _ => await 
_consumer.ChannelFaulted(Exception!) );
+                ActionQueue.Enqueue(async _ => await 
_consumer.ChannelFaulted(Exception!));
             return;
         }
 
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs 
b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index 727d8a7..edb90fe 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -44,7 +44,6 @@ public sealed class DefaultExceptionHandler : IHandleException
             ServiceNotReadyException _ => FaultAction.Retry,
             MetadataException _ => FaultAction.Rethrow,
             ConsumerNotFoundException _ => FaultAction.Retry,
-            ConsumerNotActiveException _ => FaultAction.Retry,
             ConsumerBusyException _ => FaultAction.Retry,
             ProducerBusyException _ => FaultAction.Retry,
             ProducerFencedException _ => FaultAction.Rethrow,
diff --git a/src/DotPulsar/Internal/Producer.cs 
b/src/DotPulsar/Internal/Producer.cs
index dedb8f9..66c04e1 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -130,7 +130,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
         Interlocked.Exchange(ref _producerCount, monitoringTasks.Length);
 
         var connectedProducers = 0;
-        bool[] waitingForExclusive = new bool[isPartitionedTopic ? 
numberOfPartitions : 1];
+        var waitingForExclusive = new bool[isPartitionedTopic ? 
numberOfPartitions : 1];
 
         while (true)
         {
@@ -386,7 +386,7 @@ public sealed class Producer<TMessage> : 
IProducer<TMessage>, IRegisterEvent
     }
 
     private StateManager<ProducerState> CreateStateManager()
-        => new (ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted, ProducerState.Fenced);
+        => new(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted, ProducerState.Fenced);
 
     public void Register(IEvent @event) { }
 }
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs 
b/src/DotPulsar/Internal/ProducerBuilder.cs
index 647f514..dbdac4e 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -42,7 +42,6 @@ public sealed class ProducerBuilder<TMessage> : 
IProducerBuilder<TMessage>
         _producerAccessMode = 
ProducerOptions<TMessage>.DefaultProducerAccessMode;
     }
 
-
     public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool 
attachTraceInfoToMessages)
     {
         _attachTraceInfoToMessages = attachTraceInfoToMessages;
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs 
b/src/DotPulsar/Internal/ProducerProcess.cs
index f2db79f..fc25af7 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -47,7 +47,7 @@ public sealed class ProducerProcess : Process
 
         if (ExecutorState == ExecutorState.Faulted)
         {
-            ProducerState newState = Exception! is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
+            var newState = Exception! is ProducerFencedException ? 
ProducerState.Fenced : ProducerState.Faulted;
             var formerState = _stateManager.SetState(newState);
             if (formerState != ProducerState.Faulted && formerState != 
ProducerState.Fenced)
                 ActionQueue.Enqueue(async _ => await 
_producer.ChannelFaulted(Exception!));
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs 
b/src/DotPulsar/Internal/ReaderProcess.cs
index d904710..09ad54f 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -48,7 +48,7 @@ public sealed class ReaderProcess : Process
         {
             var formerState = _stateManager.SetState(ReaderState.Faulted);
             if (formerState != ReaderState.Faulted)
-                ActionQueue.Enqueue(async _ => await 
_reader.ChannelFaulted(Exception!) );
+                ActionQueue.Enqueue(async _ => await 
_reader.ChannelFaulted(Exception!));
             return;
         }
 
diff --git a/src/DotPulsar/Internal/SubConsumer.cs 
b/src/DotPulsar/Internal/SubConsumer.cs
deleted file mode 100644
index bbc9bd3..0000000
--- a/src/DotPulsar/Internal/SubConsumer.cs
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal;
-
-using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
-using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
-using DotPulsar.Internal.Extensions;
-using DotPulsar.Internal.PulsarApi;
-using Microsoft.Extensions.ObjectPool;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
-
-public sealed class SubConsumer<TMessage> : IContainsChannel, 
IConsumer<TMessage>
-{
-    private readonly Guid _correlationId;
-    private readonly IRegisterEvent _eventRegister;
-    private IConsumerChannel<TMessage> _channel;
-    private readonly IExecute _executor;
-    private readonly IStateChanged<ConsumerState> _state;
-    private readonly IConsumerChannelFactory<TMessage> _factory;
-    private Exception? _faultException;
-    private readonly ObjectPool<CommandAck> _commandAckPool;
-
-    public Uri ServiceUrl { get; }
-    public string SubscriptionName { get; }
-    public string Topic { get; }
-    private int _isDisposed;
-
-    public SubConsumer(
-        Guid correlationId,
-        Uri serviceUrl,
-        string subscriptionName,
-        string topic,
-        IRegisterEvent eventRegister,
-        IConsumerChannel<TMessage> initialChannel,
-        IExecute executor,
-        IStateChanged<ConsumerState> state,
-        IConsumerChannelFactory<TMessage> factory
-    )
-    {
-        _correlationId = correlationId;
-        ServiceUrl = serviceUrl;
-        SubscriptionName = subscriptionName;
-        Topic = topic;
-        _eventRegister = eventRegister;
-        _channel = initialChannel;
-        _executor = executor;
-        _state = state;
-        _factory = factory;
-        _isDisposed = 0;
-        _commandAckPool = new DefaultObjectPool<CommandAck>(new 
DefaultPooledObjectPolicy<CommandAck>());
-
-        _eventRegister.Register(new ConsumerCreated(_correlationId));
-    }
-
-    public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
-          => await InternalAcknowledge(messageId, 
CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
-
-    private async ValueTask InternalAcknowledge(MessageId messageId, 
CommandAck.AckType ackType, CancellationToken cancellationToken)
-    {
-        var commandAck = _commandAckPool.Get();
-        commandAck.Type = ackType;
-        if (commandAck.MessageIds.Count == 0)
-            commandAck.MessageIds.Add(messageId.ToMessageIdData());
-        else
-            commandAck.MessageIds[0].MapFrom(messageId);
-
-        try
-        {
-            await _executor.Execute(() => InternalAcknowledge(commandAck, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-        }
-        finally
-        {
-            _commandAckPool.Return(commandAck);
-        }
-    }
-
-    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
-    {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask ChannelFaulted(Exception exception)
-    {
-        _faultException = exception;
-        await DisposeChannel().ConfigureAwait(false);
-    }
-
-    public async ValueTask CloseChannel(CancellationToken cancellationToken)
-        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask DisposeAsync()
-    {
-        if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
-            return;
-
-        _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
-    }
-
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
-    {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        var oldChannel = _channel;
-        if (oldChannel is not null)
-            await oldChannel.DisposeAsync().ConfigureAwait(false);
-
-        _channel = channel;
-    }
-
-    public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
-    {
-        Guard();
-        var getLastMessageId = new CommandGetLastMessageId();
-        return await _channel.Send(getLastMessageId, 
cancellationToken).ConfigureAwait(false);
-    }
-
-    public bool IsFinalState()
-        => _state.IsFinalState();
-
-    public bool IsFinalState(ConsumerState state)
-        => _state.IsFinalState(state);
-
-    public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState 
state, CancellationToken cancellationToken = default)
-        => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
-
-
-    public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, 
CancellationToken cancellationToken = default)
-        => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
-        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
-
-    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
-    {
-        Guard();
-        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
-    {
-        var command = new CommandRedeliverUnacknowledgedMessages();
-        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
-        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
-        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
-
-    private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
-    {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken = default)
-    {
-        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken = default)
-    {
-        var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
-
-    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
-    {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-    }
-
-    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
-    {
-        Guard();
-        var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => Unsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
-    }
-
-    private async ValueTask Unsubscribe(CommandUnsubscribe command, 
CancellationToken cancellationToken)
-        => await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
-
-    private void Guard()
-    {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
-
-        if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
-    }
-
-    private async ValueTask DisposeChannel()
-    {
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
-    }
-}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 14085f1..66284ef 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -89,12 +89,43 @@ public sealed class PulsarClient : IPulsarClient
     {
         ThrowIfDisposed();
 
-        var consumer = new Consumer<TMessage>(ServiceUrl, options, 
_processManager, _exceptionHandler, _connectionPool);
+        var correlationId = Guid.NewGuid();
+        var consumerName = options.ConsumerName ?? 
$"Consumer-{correlationId:N}";
+        var subscribe = new CommandSubscribe
+        {
+            ConsumerName = consumerName,
+            InitialPosition = (CommandSubscribe.InitialPositionType) 
options.InitialPosition,
+            PriorityLevel = options.PriorityLevel,
+            ReadCompacted = options.ReadCompacted,
+            ReplicateSubscriptionState = options.ReplicateSubscriptionState,
+            Subscription = options.SubscriptionName,
+            Topic = options.Topic,
+            Type = (CommandSubscribe.SubType) options.SubscriptionType
+        };
 
+        foreach (var property in options.SubscriptionProperties)
+        {
+            var keyValue = new KeyValue { Key = property.Key, Value = 
property.Value };
+            subscribe.SubscriptionProperties.Add(keyValue);
+        }
+
+        var messagePrefetchCount = options.MessagePrefetchCount;
+        var messageFactory = new MessageFactory<TMessage>(options.Schema);
+        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+        var decompressorFactories = 
CompressionFactories.DecompressorFactories();
+        var factory = new ConsumerChannelFactory<TMessage>(correlationId, 
_processManager, _connectionPool, subscribe, messagePrefetchCount, 
batchHandler, messageFactory, decompressorFactories);
+        var stateManager = new 
StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, 
ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+        var initialChannel = new NotReadyChannel<TMessage>();
+        var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
+        var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, 
options.SubscriptionName, options.Topic, _processManager, initialChannel, 
executor, stateManager, factory);
         if (options.StateChangedHandler is not null)
             _ = StateMonitor.MonitorConsumer(consumer, 
options.StateChangedHandler);
+        var process = new ConsumerProcess(correlationId, stateManager, 
consumer, options.SubscriptionType == SubscriptionType.Failover);
+        _processManager.Add(process);
+        process.Start();
         return consumer;
     }
+
     /// <summary>
     /// Create a reader.
     /// </summary>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs 
b/tests/DotPulsar.Tests/ConsumerTests.cs
index bbb23f7..fbf43c8 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -70,32 +70,6 @@ public class ConsumerTests
         consumed.Should().BeEquivalentTo(produced);
     }
 
-    [Fact]
-    public async Task 
PartitionConsume_WhenGetLastMessageId_ThenShouldThrowException()
-    {
-        //Arrange
-        var testRunId = Guid.NewGuid().ToString("N");
-        const int partitions = 3;
-        var topicName = $"consumer-tests-{testRunId}";
-
-        
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}", 
partitions);
-
-        //Act 
-        await using var client = PulsarClient.Builder()
-            .ServiceUrl(_fixture.ServiceUrl)
-            .Authentication(AuthenticationFactory.Token(ct => 
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
-            .Build();
-
-        var consumer = client.NewConsumer(Schema.ByteArray)
-            .ConsumerName($"consumer-{testRunId}")
-            .InitialPosition(SubscriptionInitialPosition.Earliest)
-            .SubscriptionName($"subscription-{testRunId}")
-            .Topic(topicName)
-            .Create();
-        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));
-        Assert.ThrowsAsync<NotImplementedException>(async () => await 
consumer.GetLastMessageId(cts.Token));
-    }
-
     private static async Task<IEnumerable<MessageId>> 
ProduceMessages(IProducer<byte[]> producer, int numberOfMessages, 
CancellationToken ct)
     {
         var messageIds = new MessageId[numberOfMessages];
diff --git a/tests/DotPulsar.Tests/ProducerTests.cs 
b/tests/DotPulsar.Tests/ProducerTests.cs
index db2b1ce..ed54195 100644
--- a/tests/DotPulsar.Tests/ProducerTests.cs
+++ b/tests/DotPulsar.Tests/ProducerTests.cs
@@ -175,7 +175,6 @@ public class ProducerTests
     [InlineData(ProducerAccessMode.Shared, ProducerAccessMode.Exclusive, 
ProducerState.Connected, ProducerState.Fenced)]
     [InlineData(ProducerAccessMode.Shared, 
ProducerAccessMode.WaitForExclusive, ProducerState.Connected, 
ProducerState.WaitingForExclusive)]
     [InlineData(ProducerAccessMode.Exclusive, 
ProducerAccessMode.WaitForExclusive, ProducerState.Connected, 
ProducerState.WaitingForExclusive)]
-
     public async Task 
TwoProducers_WhenUsingDifferentAccessModes_ThenGoToExpectedStates(ProducerAccessMode
 accessMode1, ProducerAccessMode accessMode2, ProducerState producerState1, 
ProducerState producerState2)
     {
         //Arrange

Reply via email to