blankensteiner commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1286877302
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -15,49 +15,140 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
+using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-public sealed class Reader<TMessage> : IContainsChannel, IReader<TMessage>
+public sealed class Reader<TMessage> : IReader<TMessage>
{
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel<TMessage> _channel;
+ private readonly TaskCompletionSource<IMessage<TMessage>>
_emptyTaskCompletionSource;
+ private readonly ReaderOptions<TMessage> _readerOptions;
+ private readonly ConcurrentDictionary<string, SubReader<TMessage>>
_subReaders;
+ private readonly IHandleException _exceptionHandler;
+ private readonly IConnectionPool _connectionPool;
+ private readonly ProcessManager _processManager;
+ private readonly CancellationTokenSource _cts;
private readonly IExecute _executor;
- private readonly IStateChanged<ReaderState> _state;
- private readonly IConsumerChannelFactory<TMessage> _factory;
+ private readonly StateManager<ReaderState> _state;
+ private readonly SemaphoreSlim _semaphoreSlim;
+ private bool _allSubReadersIsReady;
+ private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders;
+ private int _subReaderIndex;
+ private bool _isPartitioned;
+ private uint _numberOfPartitions;
private int _isDisposed;
private Exception? _faultException;
public Uri ServiceUrl { get; }
public string Topic { get; }
public Reader(
- Guid correlationId,
Uri serviceUrl,
- string topic,
- IRegisterEvent eventRegister,
- IConsumerChannel<TMessage> initialChannel,
- IExecute executor,
- IStateChanged<ReaderState> state,
- IConsumerChannelFactory<TMessage> factory)
+ ReaderOptions<TMessage> readerOptions,
+ ProcessManager processManager,
+ IHandleException exceptionHandler,
+ IConnectionPool connectionPool)
{
- _correlationId = correlationId;
ServiceUrl = serviceUrl;
- Topic = topic;
- _eventRegister = eventRegister;
- _channel = initialChannel;
- _executor = executor;
- _state = state;
- _factory = factory;
+ Topic = readerOptions.Topic;
+ _readerOptions = readerOptions;
+ _connectionPool = connectionPool;
+ _processManager = processManager;
+ _exceptionHandler = exceptionHandler;
+ _semaphoreSlim = new SemaphoreSlim(1);
+ _state = CreateStateManager();
+ _receiveTaskQueueForSubReaders =
Array.Empty<Task<IMessage<TMessage>>>();
+ _subReaders = new ConcurrentDictionary<string, SubReader<TMessage>>();
+ _cts = new CancellationTokenSource();
+ _executor = new Executor(Guid.Empty, _processManager,
_exceptionHandler);
_isDisposed = 0;
- _eventRegister.Register(new ReaderCreated(_correlationId));
+ _emptyTaskCompletionSource = new
TaskCompletionSource<IMessage<TMessage>>();
+
+ _ = Setup();
+ }
+
+ private async Task Setup()
+ {
+ try
+ {
+ await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (_cts.IsCancellationRequested)
+ return;
+
+ _faultException = exception;
+ _state.SetState(ReaderState.Faulted);
+ }
+ }
+
+ private async Task Monitor()
+ {
+ await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
+ _numberOfPartitions = await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+ _isPartitioned = _numberOfPartitions != 0;
+
+ _receiveTaskQueueForSubReaders = new
Task<IMessage<TMessage>>[_numberOfPartitions];
+ for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
+ {
+ _receiveTaskQueueForSubReaders[i] =
_emptyTaskCompletionSource.Task;
+ }
+
+ if (_isPartitioned)
+ {
+ for (var partition = 0; partition < _numberOfPartitions;
partition++)
+ {
+ var partitionedTopicName = GetPartitionedTopicName(partition);
+
+ var subReader = CreateSubReader(partitionedTopicName);
+ _ = _subReaders.TryAdd(partitionedTopicName, subReader);
+ }
+ }
+ else
+ {
+ var subReader = CreateSubReader(Topic);
+ _ = _subReaders.TryAdd(Topic, subReader);
+ }
+ _allSubReadersIsReady = true;
Review Comment:
Is=Are :-)
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
Review Comment:
Use var
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -15,49 +15,140 @@
namespace DotPulsar.Internal;
using DotPulsar.Abstractions;
-using DotPulsar.Exceptions;
using DotPulsar.Internal.Abstractions;
-using DotPulsar.Internal.Events;
+using DotPulsar.Internal.Compression;
using DotPulsar.Internal.PulsarApi;
using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-public sealed class Reader<TMessage> : IContainsChannel, IReader<TMessage>
+public sealed class Reader<TMessage> : IReader<TMessage>
{
- private readonly Guid _correlationId;
- private readonly IRegisterEvent _eventRegister;
- private IConsumerChannel<TMessage> _channel;
+ private readonly TaskCompletionSource<IMessage<TMessage>>
_emptyTaskCompletionSource;
+ private readonly ReaderOptions<TMessage> _readerOptions;
+ private readonly ConcurrentDictionary<string, SubReader<TMessage>>
_subReaders;
+ private readonly IHandleException _exceptionHandler;
+ private readonly IConnectionPool _connectionPool;
+ private readonly ProcessManager _processManager;
+ private readonly CancellationTokenSource _cts;
private readonly IExecute _executor;
- private readonly IStateChanged<ReaderState> _state;
- private readonly IConsumerChannelFactory<TMessage> _factory;
+ private readonly StateManager<ReaderState> _state;
+ private readonly SemaphoreSlim _semaphoreSlim;
+ private bool _allSubReadersIsReady;
+ private Task<IMessage<TMessage>>[] _receiveTaskQueueForSubReaders;
+ private int _subReaderIndex;
+ private bool _isPartitioned;
+ private uint _numberOfPartitions;
private int _isDisposed;
private Exception? _faultException;
public Uri ServiceUrl { get; }
public string Topic { get; }
public Reader(
- Guid correlationId,
Uri serviceUrl,
- string topic,
- IRegisterEvent eventRegister,
- IConsumerChannel<TMessage> initialChannel,
- IExecute executor,
- IStateChanged<ReaderState> state,
- IConsumerChannelFactory<TMessage> factory)
+ ReaderOptions<TMessage> readerOptions,
+ ProcessManager processManager,
+ IHandleException exceptionHandler,
+ IConnectionPool connectionPool)
{
- _correlationId = correlationId;
ServiceUrl = serviceUrl;
- Topic = topic;
- _eventRegister = eventRegister;
- _channel = initialChannel;
- _executor = executor;
- _state = state;
- _factory = factory;
+ Topic = readerOptions.Topic;
+ _readerOptions = readerOptions;
+ _connectionPool = connectionPool;
+ _processManager = processManager;
+ _exceptionHandler = exceptionHandler;
+ _semaphoreSlim = new SemaphoreSlim(1);
+ _state = CreateStateManager();
+ _receiveTaskQueueForSubReaders =
Array.Empty<Task<IMessage<TMessage>>>();
+ _subReaders = new ConcurrentDictionary<string, SubReader<TMessage>>();
+ _cts = new CancellationTokenSource();
+ _executor = new Executor(Guid.Empty, _processManager,
_exceptionHandler);
_isDisposed = 0;
- _eventRegister.Register(new ReaderCreated(_correlationId));
+ _emptyTaskCompletionSource = new
TaskCompletionSource<IMessage<TMessage>>();
+
+ _ = Setup();
+ }
+
+ private async Task Setup()
+ {
+ try
+ {
+ await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+ }
+ catch (Exception exception)
+ {
+ if (_cts.IsCancellationRequested)
+ return;
+
+ _faultException = exception;
+ _state.SetState(ReaderState.Faulted);
+ }
+ }
+
+ private async Task Monitor()
+ {
+ await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
+ _numberOfPartitions = await
_connectionPool.GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+ _isPartitioned = _numberOfPartitions != 0;
+
+ _receiveTaskQueueForSubReaders = new
Task<IMessage<TMessage>>[_numberOfPartitions];
+ for (var i = 0; i < _receiveTaskQueueForSubReaders.Length; i++)
+ {
+ _receiveTaskQueueForSubReaders[i] =
_emptyTaskCompletionSource.Task;
+ }
+
+ if (_isPartitioned)
+ {
+ for (var partition = 0; partition < _numberOfPartitions;
partition++)
+ {
+ var partitionedTopicName = GetPartitionedTopicName(partition);
+
Review Comment:
Delete empty space
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
Review Comment:
Iterate on _subReaders.Values instead
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
Review Comment:
ElementAt is slow, we must find a better solution. Let's have a Teams chat
about this.
##########
tests/DotPulsar.Tests/ReaderTests.cs:
##########
@@ -0,0 +1,291 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using FluentAssertions;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection("Integration"), Trait("Category", "Integration")]
+public class ReaderTests
+{
+ private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
+
+ public ReaderTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
+ {
+ _fixture = fixture;
+ _testOutputHelper = testOutputHelper;
+ }
+
+ [Fact]
+ public async Task Receive_GivenTopicWithMessages_ShouldConsumeAll()
Review Comment:
ShouldReceiveAll
##########
src/DotPulsar/Internal/SubProducer.cs:
##########
@@ -201,7 +205,15 @@ private void ProcessReceipt(CommandSendReceipt sendReceipt)
_sendQueue.Dequeue();
var srMsgId = sendReceipt.MessageId;
- var messageId = _partition == -1 ? srMsgId.ToMessageId() : new
MessageId(srMsgId.LedgerId, srMsgId.EntryId, _partition, srMsgId.BatchIndex);
+ MessageId messageId;
+ if (_partition == -1)
Review Comment:
No need for an if-else, since the result is the same
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
Review Comment:
Use var
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
Review Comment:
Iterate on getLasMessageIds instead
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -70,6 +70,154 @@ public async Task
Messages_GivenTopicWithMessages_ShouldConsumeAll(int numberOfM
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ var topicName =
$"persistent://public/default/consumer-tests-{testRunId}";
+
+ _fixture.CreatePartitionedTopic(topicName, partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<NotSupportedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReceiveAll()
Review Comment:
How is this different than the first test? If we are again testing if all
messages in partitioned topics are received?
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
Review Comment:
Iterate on _subReaders.Values
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
Review Comment:
Use var
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] = _subReaders.Values.ElementAt(i).Seek(messageId,
cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
Review Comment:
Iterate on _subReaders.Values
##########
src/DotPulsar/Internal/Reader.cs:
##########
@@ -72,86 +163,158 @@ public bool IsFinalState()
public bool IsFinalState(ReaderState state)
=> _state.IsFinalState(state);
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
+ }
+ else
+ {
+ return await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ }
}
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ Task<MessageId>[] getLastMessageIdsTasks = new
Task<MessageId>[_numberOfPartitions];
+
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ var getLastMessageIdTask =
_subReaders.Values.ElementAt(i).GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks[i] = getLastMessageIdTask.AsTask();
+ }
+
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
+
+ //collect MessageIds
+ List<MessageId> messageIds = new List<MessageId>();
+ for (var i = 0; i < _subReaders.Count; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
+ }
+ else
+ {
+ MessageId[] messageIds = new MessageId[1];
+ messageIds[0] = await
_subReaders[Topic].GetLastMessageId(cancellationToken).ConfigureAwait(false);
+ return messageIds;
+ }
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
{
- Guard();
- return await _channel.Receive(cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var iterations = 0;
+ while (true)
+ {
+ _subReaderIndex++;
+ iterations++;
+ if (_subReaderIndex == _subReaders.Count)
+ _subReaderIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubReaders[_subReaderIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subReaders.ElementAt(_subReaderIndex).Value.Receive(cancellationToken);
+
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubReaders[_subReaderIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subReaders.Count)
+ await
Task.WhenAny(_receiveTaskQueueForSubReaders).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ return await
_subReaders[Topic].Receive(cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] = _subReaders.Values.ElementAt(i).Seek(messageId,
cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (_isPartitioned)
+ {
+ var seekTasks = new Task[_numberOfPartitions];
+ for (var i = 0; i < _subReaders.Values.Count; i++)
+ {
+ seekTasks[i] =
_subReaders.Values.ElementAt(i).Seek(publishTime, cancellationToken).AsTask();
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
+ else
+ {
+ await _subReaders[Topic].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ }
}
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ReaderDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
- }
-
- private async ValueTask DisposeChannel()
- {
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
+ foreach (var subConsumer in _subReaders.Values)
+ {
+ await subConsumer.DisposeAsync().ConfigureAwait(false);
+ }
}
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
+ private StateManager<ReaderState> CreateStateManager()
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ return new StateManager<ReaderState>(ReaderState.Disconnected,
ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ private string GetPartitionedTopicName(int partitionNumber)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
-
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
-
- _channel = channel;
- }
-
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ReaderDisposedException(GetType().FullName!);
-
- if (_faultException is not null)
- throw new ReaderFaultedException(_faultException);
+ return $"{Topic}-partition-{partitionNumber}";
}
- public async ValueTask ChannelFaulted(Exception exception)
+ private async Task Guard(CancellationToken cancellationToken)
{
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
+ if (!_allSubReadersIsReady)
+ await
_semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
Review Comment:
Should be released again?
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -34,13 +35,12 @@ public ConsumerTests(IntegrationFixture fixture)
_fixture = fixture;
}
- [Theory]
- [InlineData(10000)]
- public async Task Messages_GivenTopicWithMessages_ShouldConsumeAll(int
numberOfMessages)
+ [Fact]
+ public async Task Consumer_GivenTopicWithMessages_ShouldConsumeAll()
Review Comment:
So we are testing that we Receive all messages? Then the test should start
with "Receive" instead of "Consumer"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]