entvex commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1253114094


##########
src/DotPulsar/Internal/Consumer.cs:
##########
@@ -86,140 +133,191 @@ public async ValueTask DisposeAsync()
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await DisposeChannel().ConfigureAwait(false);
+        foreach (var subConsumer in _subConsumers.Values)
+        {
+            await subConsumer.DisposeAsync().ConfigureAwait(false);
+        }
     }
 
-    private async ValueTask DisposeChannel()
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
     {
-        await 
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
+        var iterations = 0;
+        while (_subConsumerIndex < _subConsumers.Count)
+        {
+            _subConsumerIndex++;
+            iterations++;
+            if (_subConsumerIndex == _subConsumers.Count)
+                _subConsumerIndex = 0;
+
+            var receiveTask = 
_receiveTaskQueueForSubConsumers[_subConsumerIndex];
+            if (receiveTask == null)
+            {
+                var receiveTaskValueTask = 
_subConsumers.ElementAt(_subConsumerIndex).Value.Receive(cancellationToken);
+
+                if (receiveTaskValueTask.IsCompleted)
+                {
+                    return receiveTaskValueTask.Result;
+                }
+                {
+                    _receiveTaskQueueForSubConsumers[_subConsumerIndex] = 
receiveTaskValueTask.AsTask(); //.AsTask
+                }
+            }
+            else
+            {
+                if (receiveTask.IsCompleted)
+                {
+                    _receiveTaskQueueForSubConsumers[_subConsumerIndex] = null;
+                    return receiveTask.Result;
+                }
+            }
+            if (iterations == _subConsumers.Count)
+                await 
Task.WhenAny(_receiveTaskQueueForSubConsumers).ConfigureAwait(false);
+        }
+        return null!; //TODO should never reach here. Should we throw an 
exception or something ?
     }
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken 
cancellationToken)
-        => await _executor.Execute(() => InternalReceive(cancellationToken), 
cancellationToken).ConfigureAwait(false);
-
     public async ValueTask Acknowledge(MessageId messageId, CancellationToken 
cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, 
cancellationToken).ConfigureAwait(false);
+    {
+        var sourceTopic = Topic;
+        if (_isPartitioned)
+        {
+            sourceTopic = GetPartitionedTopicName(messageId.Partition);
+        }
+        await _subConsumers[sourceTopic].Acknowledge(messageId, 
cancellationToken).ConfigureAwait(false);
+    }
 
     public async ValueTask AcknowledgeCumulative(MessageId messageId, 
CancellationToken cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, 
cancellationToken).ConfigureAwait(false);
+    {
+        var sourceTopic = Topic;
+        if (_isPartitioned)
+        {
+            sourceTopic = GetPartitionedTopicName(messageId.Partition);
+        }
+        await _subConsumers[sourceTopic].AcknowledgeCumulative(messageId, 
cancellationToken).ConfigureAwait(false);
+    }
 
     public async ValueTask 
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, 
CancellationToken cancellationToken)
     {
-        var command = new CommandRedeliverUnacknowledgedMessages();
-        command.MessageIds.AddRange(messageIds.Select(messageId => 
messageId.ToMessageIdData()));
-        await _executor.Execute(() => 
InternalRedeliverUnacknowledgedMessages(command, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        var enumerable = messageIds as MessageId[] ?? messageIds.ToArray();
+        foreach (var messageId in enumerable)
+        {
+            var sourceTopic = Topic;
+            if (_isPartitioned)
+            {
+                sourceTopic = GetPartitionedTopicName(messageId.Partition);
+            }
+            await 
_subConsumers[sourceTopic].RedeliverUnacknowledgedMessages(enumerable, 
cancellationToken).ConfigureAwait(false);
+        }
     }
 
     public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken 
cancellationToken)
-        => await 
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), 
cancellationToken).ConfigureAwait(false);
+    {
+        foreach (var subConsumer in _subConsumers.Values)
+        {
+            await 
subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+        }
+    }
 
     public async ValueTask Unsubscribe(CancellationToken cancellationToken)
     {
-        var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, 
cancellationToken), cancellationToken).ConfigureAwait(false);
+        foreach (var subConsumer in _subConsumers.Values)
+        {
+            await 
subConsumer.Unsubscribe(cancellationToken).ConfigureAwait(false);
+        }
     }
 
     public async ValueTask Seek(MessageId messageId, CancellationToken 
cancellationToken)
     {
-        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        var sourceTopic = Topic;
+        if (_isPartitioned)
+        {
+            sourceTopic = GetPartitionedTopicName(messageId.Partition);
+        }
+        await _subConsumers[sourceTopic].Seek(messageId, 
cancellationToken).ConfigureAwait(false);
     }
 
     public async ValueTask Seek(ulong publishTime, CancellationToken 
cancellationToken)
     {
-        var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), 
cancellationToken).ConfigureAwait(false);
+        var subConsumerSeekTask = new List<Task>();
+        foreach (var subConsumer in _subConsumers.Values)
+        {
+            subConsumerSeekTask.Add(subConsumer.Seek(publishTime, 
cancellationToken).AsTask());
+        }
+        await Task.WhenAll(subConsumerSeekTask).ConfigureAwait(false);
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken 
cancellationToken)
     {
-        var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
-    }
-
-    private void Guard()
-    {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
-
-        if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
-    }
-
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
-    {
-        var channel = await _executor.Execute(() => 
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        var oldChannel = _channel;
-        if (oldChannel is not null)
-            await oldChannel.DisposeAsync().ConfigureAwait(false);
-
-        _channel = channel;
+        return await 
_subConsumers[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+        //var getLastMessageId = new CommandGetLastMessageId();
+        //return await _executor.Execute(() => 
InternalGetLastMessageId(getLastMessageId, cancellationToken), 
cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask CloseChannel(CancellationToken cancellationToken)
-        => await 
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask ChannelFaulted(Exception exception)
+    private SubConsumer<TMessage> CreateSubConsumer(string topic)
     {
-        _faultException = exception;
-        await DisposeChannel().ConfigureAwait(false);
-    }
+        var correlationId = Guid.NewGuid();
+        var consumerName = _options.ConsumerName ?? 
$"Consumer-{correlationId:N}";
 
-    private async ValueTask InternalAcknowledge(CommandAck command, 
CancellationToken cancellationToken)
-    {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
-    }
+        var subscribe = new CommandSubscribe
+        {
+            ConsumerName = consumerName,
+            InitialPosition = (CommandSubscribe.InitialPositionType) 
_options.InitialPosition,
+            PriorityLevel = _options.PriorityLevel,
+            ReadCompacted = _options.ReadCompacted,
+            ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
+            Subscription = _options.SubscriptionName,
+            Topic = topic,
+            Type = (CommandSubscribe.SubType) _options.SubscriptionType
+        };
+
+        foreach (var property in _options.SubscriptionProperties)
+        {
+            var keyValue = new KeyValue { Key = property.Key, Value = 
property.Value };
+            subscribe.SubscriptionProperties.Add(keyValue);
+        }
 
-    private async ValueTask 
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages 
command, CancellationToken cancellationToken)
-    {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        var messagePrefetchCount = _options.MessagePrefetchCount;
+        var messageFactory = new MessageFactory<TMessage>(_options.Schema);
+        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+        var decompressorFactories = 
CompressionFactories.DecompressorFactories();
+        var consumerChannelFactory = new 
ConsumerChannelFactory<TMessage>(correlationId, _processManager, 
_connectionPool, subscribe,
+            messagePrefetchCount, batchHandler, messageFactory, 
decompressorFactories);
+        var stateManager = CreateStateManager();
+        var initialChannel = new NotReadyChannel<TMessage>();
+        var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
+
+        var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, 
_options.SubscriptionName, _options.Topic,
+            _processManager, initialChannel, executor, stateManager, 
consumerChannelFactory);
+
+        if (_options.StateChangedHandler is not null)
+            _ = StateMonitor.MonitorConsumer(subConsumer, 
_options.StateChangedHandler);
+
+        var process = new ConsumerProcess(correlationId, stateManager, 
subConsumer, _options.SubscriptionType == SubscriptionType.Failover);
+        _processManager.Add(process);
+        process.Start();
+        return subConsumer;
     }
-
-    private async ValueTask<MessageId> 
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken 
cancellationToken)
+    private string GetPartitionedTopicName(int partitionNumber)
     {
-        Guard();
-        return await _channel.Send(command, 
cancellationToken).ConfigureAwait(false);
+        return $"{Topic}-partition-{partitionNumber}";
     }
-
-    private async Task InternalSeek(CommandSeek command, CancellationToken 
cancellationToken)
+    private StateManager<ConsumerState> CreateStateManager()
     {
-        Guard();
-        await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+        return new StateManager<ConsumerState>(ConsumerState.Disconnected, 
ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
     }
-
-    private async ValueTask<IMessage<TMessage>> 
InternalReceive(CancellationToken cancellationToken)
+    private async Task<uint> GetNumberOfPartitions(CancellationToken 
cancellationToken)
     {
-        Guard();
-        return await _channel.Receive(cancellationToken).ConfigureAwait(false);
-    }
+        //TODO guard for null if cancellationToken is not set.

Review Comment:
   just remove this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to