blankensteiner commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1293171395
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -70,6 +69,115 @@ public async Task
Messages_GivenTopicWithMessages_ShouldConsumeAll(int numberOfM
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ var topicName =
$"persistent://public/default/consumer-tests-{testRunId}";
+
+ _fixture.CreatePartitionedTopic(topicName, partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<NotSupportedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReceiveAll()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ const int numberOfMessages = 10000;
+ var topicName = $"consumer-with-3-partitions-test";
+
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = PulsarClient.Builder()
Review Comment:
Could the creation of the client, consumer, and producer be in helper
methods, so that each test can share that code and thereby be shorter?
##########
src/DotPulsar/Internal/Consumer.cs:
##########
@@ -86,140 +143,270 @@ public async ValueTask DisposeAsync()
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
- _eventRegister.Register(new ConsumerDisposed(_correlationId));
- await DisposeChannel().ConfigureAwait(false);
+ foreach (var subConsumer in _subConsumers)
+ {
+ await subConsumer.DisposeAsync().ConfigureAwait(false);
+ }
}
- private async ValueTask DisposeChannel()
+ public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
{
- await
_channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
- await _channel.DisposeAsync().ConfigureAwait(false);
- }
+ await Guard(cancellationToken).ConfigureAwait(false);
- public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
- => await _executor.Execute(() => InternalReceive(cancellationToken),
cancellationToken).ConfigureAwait(false);
+ if (!_isPartitioned)
+ return await
_subConsumers[_subConsumerIndex].Receive(cancellationToken).ConfigureAwait(false);
+
+ var iterations = 0;
+ while (true)
+ {
+ iterations++;
+ _subConsumerIndex++;
+ if (_subConsumerIndex == _subConsumers.Length)
+ _subConsumerIndex = 0;
+
+ var receiveTask =
_receiveTaskQueueForSubConsumers[_subConsumerIndex];
+ if (receiveTask == _emptyTaskCompletionSource.Task)
+ {
+ var receiveTaskValueTask =
_subConsumers[_subConsumerIndex].Receive(cancellationToken);
+ if (receiveTaskValueTask.IsCompleted)
+ return receiveTaskValueTask.Result;
+ _receiveTaskQueueForSubConsumers[_subConsumerIndex] =
receiveTaskValueTask.AsTask();
+ }
+ else
+ {
+ if (receiveTask.IsCompleted)
+ {
+ _receiveTaskQueueForSubConsumers[_subConsumerIndex] =
_emptyTaskCompletionSource.Task;
+ return receiveTask.Result;
+ }
+ }
+ if (iterations == _subConsumers.Length)
+ await
Task.WhenAny(_receiveTaskQueueForSubConsumers).ConfigureAwait(false);
+ }
+ }
public async ValueTask Acknowledge(MessageId messageId, CancellationToken
cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Individual,
cancellationToken).ConfigureAwait(false);
+ {
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await _subConsumers[_subConsumerIndex].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
+ else
+ await _subConsumers[messageId.Partition].Acknowledge(messageId,
cancellationToken).ConfigureAwait(false);
+ }
public async ValueTask AcknowledgeCumulative(MessageId messageId,
CancellationToken cancellationToken)
- => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative,
cancellationToken).ConfigureAwait(false);
+ {
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await
_subConsumers[_subConsumerIndex].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
+ else
+ await
_subConsumers[messageId.Partition].AcknowledgeCumulative(messageId,
cancellationToken).ConfigureAwait(false);
+ }
public async ValueTask
RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds,
CancellationToken cancellationToken)
{
- var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(messageId =>
messageId.ToMessageIdData()));
- await _executor.Execute(() =>
InternalRedeliverUnacknowledgedMessages(command, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(messageIds,
cancellationToken).ConfigureAwait(false);
+ else
+ {
+ var messageIdSortedIntoTopics = new Dictionary<int,
LinkedList<MessageId>>(_numberOfPartitions);
+ //sort messageIds into topics
+ foreach (var messageId in messageIds)
+ {
+ if (messageIdSortedIntoTopics.ContainsKey(messageId.Partition))
+ {
+
messageIdSortedIntoTopics[messageId.Partition].AddLast(messageId);
+ }
+ else
+ {
+ var linkedList = new LinkedList<MessageId>();
+ linkedList.AddLast(messageId);
+ messageIdSortedIntoTopics.Add(messageId.Partition,
linkedList);
+ }
+ }
+ var redeliverUnacknowledgedMessagesTasks = new
Task[messageIdSortedIntoTopics.Count];
+ var iterations = -1;
+ //Collect tasks from _subConsumers RedeliverUnacknowledgedMessages
without waiting
+ foreach (var messageIdSortedByPartition in
messageIdSortedIntoTopics)
+ {
+ iterations++;
+ var task =
_subConsumers[messageIdSortedByPartition.Key].RedeliverUnacknowledgedMessages(messageIdSortedByPartition.Value,
cancellationToken).AsTask();
+ redeliverUnacknowledgedMessagesTasks[iterations] = task;
+ }
+ //await all of the tasks.
+ await
Task.WhenAll(redeliverUnacknowledgedMessagesTasks).ConfigureAwait(false);
+ }
}
public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken
cancellationToken)
- => await
RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(),
cancellationToken).ConfigureAwait(false);
+ {
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await
_subConsumers[_subConsumerIndex].RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+ else
+ {
+ foreach (var subConsumer in _subConsumers)
+ {
+ await
subConsumer.RedeliverUnacknowledgedMessages(cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
- var unsubscribe = new CommandUnsubscribe();
- await _executor.Execute(() => InternalUnsubscribe(unsubscribe,
cancellationToken), cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await
_subConsumers[_subConsumerIndex].Unsubscribe(cancellationToken).ConfigureAwait(false);
+ else
+ {
+ var unsubscribeTasks = new List<Task>(_numberOfPartitions);
+ foreach (var subConsumer in _subConsumers)
+ {
+ var getLastMessageIdTask =
subConsumer.Unsubscribe(cancellationToken);
+ unsubscribeTasks.Add(getLastMessageIdTask.AsTask());
+ }
+ await Task.WhenAll(unsubscribeTasks).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(MessageId messageId, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await _subConsumers[_subConsumerIndex].Seek(messageId,
cancellationToken).ConfigureAwait(false);
+ else
+ {
+ var seekTasks = new List<Task>(_numberOfPartitions);
+ foreach (var subConsumer in _subConsumers)
+ {
+ var getLastMessageIdTask = subConsumer.Seek(messageId,
cancellationToken);
+ seekTasks.Add(getLastMessageIdTask.AsTask());
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
}
public async ValueTask Seek(ulong publishTime, CancellationToken
cancellationToken)
{
- var seek = new CommandSeek { MessagePublishTime = publishTime };
- await _executor.Execute(() => InternalSeek(seek, cancellationToken),
cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
+
+ if (!_isPartitioned)
+ await _subConsumers[_subConsumerIndex].Seek(publishTime,
cancellationToken).ConfigureAwait(false);
+ else
+ {
+ var seekTasks = new List<Task>(_numberOfPartitions);
+ foreach (var subConsumer in _subConsumers)
+ {
+ var getLastMessageIdTask = subConsumer.Seek(publishTime,
cancellationToken);
+ seekTasks.Add(getLastMessageIdTask.AsTask());
+ }
+ await Task.WhenAll(seekTasks).ConfigureAwait(false);
+ }
}
+ [Obsolete("GetLastMessageId is obsolete. Please use GetLastMessageIds
instead.")]
public async ValueTask<MessageId> GetLastMessageId(CancellationToken
cancellationToken)
{
- var getLastMessageId = new CommandGetLastMessageId();
- return await _executor.Execute(() =>
InternalGetLastMessageId(getLastMessageId, cancellationToken),
cancellationToken).ConfigureAwait(false);
- }
+ await Guard(cancellationToken).ConfigureAwait(false);
- private void Guard()
- {
- if (_isDisposed != 0)
- throw new ConsumerDisposedException(GetType().FullName!);
+ if (!_isPartitioned)
+ return await
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false);
- if (_faultException is not null)
- throw new ConsumerFaultedException(_faultException);
+ throw new NotSupportedException("GetLastMessageId can't be used on
partitioned topics. Please use GetLastMessageIds");
}
- public async Task EstablishNewChannel(CancellationToken cancellationToken)
+ public async ValueTask<IEnumerable<MessageId>>
GetLastMessageIds(CancellationToken cancellationToken)
{
- var channel = await _executor.Execute(() =>
_factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+ await Guard(cancellationToken).ConfigureAwait(false);
- var oldChannel = _channel;
- if (oldChannel is not null)
- await oldChannel.DisposeAsync().ConfigureAwait(false);
+ if (!_isPartitioned)
+ return new[] { await
_subConsumers[_subConsumerIndex].GetLastMessageId(cancellationToken).ConfigureAwait(false)
};
- _channel = channel;
- }
+ var getLastMessageIdsTasks = new
List<Task<MessageId>>(_numberOfPartitions);
- public async ValueTask CloseChannel(CancellationToken cancellationToken)
- => await
_channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
+ foreach (var subConsumer in _subConsumers)
+ {
+ var getLastMessageIdTask =
subConsumer.GetLastMessageId(cancellationToken);
+ getLastMessageIdsTasks.Add(getLastMessageIdTask.AsTask());
+ }
- public async ValueTask ChannelFaulted(Exception exception)
- {
- _faultException = exception;
- await DisposeChannel().ConfigureAwait(false);
- }
+ //await all of the tasks.
+ await Task.WhenAll(getLastMessageIdsTasks).ConfigureAwait(false);
- private async ValueTask InternalAcknowledge(CommandAck command,
CancellationToken cancellationToken)
- {
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ //collect MessageIds
+ var messageIds = new List<MessageId>();
+ for (var i = 0; i < _subConsumers.Length; i++)
+ {
+ messageIds.Add(getLastMessageIdsTasks[i].Result);
+ }
+ return messageIds;
}
- private async ValueTask
InternalRedeliverUnacknowledgedMessages(CommandRedeliverUnacknowledgedMessages
command, CancellationToken cancellationToken)
+ private SubConsumer<TMessage> CreateSubConsumer(string topic)
{
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- }
+ var correlationId = Guid.NewGuid();
+ var consumerName = _consumerOptions.ConsumerName ??
$"Consumer-{correlationId:N}";
- private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
- {
- Guard();
- return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- }
+ var subscribe = new CommandSubscribe
+ {
+ ConsumerName = consumerName,
+ InitialPosition = (CommandSubscribe.InitialPositionType)
_consumerOptions.InitialPosition,
+ PriorityLevel = _consumerOptions.PriorityLevel,
+ ReadCompacted = _consumerOptions.ReadCompacted,
+ ReplicateSubscriptionState =
_consumerOptions.ReplicateSubscriptionState,
+ Subscription = _consumerOptions.SubscriptionName,
+ Topic = topic,
+ Type = (CommandSubscribe.SubType) _consumerOptions.SubscriptionType
+ };
+
+ foreach (var property in _consumerOptions.SubscriptionProperties)
+ {
+ var keyValue = new KeyValue { Key = property.Key, Value =
property.Value };
+ subscribe.SubscriptionProperties.Add(keyValue);
+ }
- private async Task InternalSeek(CommandSeek command, CancellationToken
cancellationToken)
- {
- Guard();
- await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ var messagePrefetchCount = _consumerOptions.MessagePrefetchCount;
+ var messageFactory = new
MessageFactory<TMessage>(_consumerOptions.Schema);
+ var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+ var decompressorFactories =
CompressionFactories.DecompressorFactories();
+ var consumerChannelFactory = new
ConsumerChannelFactory<TMessage>(correlationId, _processManager,
_connectionPool, subscribe,
+ messagePrefetchCount, batchHandler, messageFactory,
decompressorFactories, topic);
+ var stateManager = CreateStateManager();
+ var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager,
_exceptionHandler);
+
+ var subConsumer = new SubConsumer<TMessage>(correlationId, ServiceUrl,
_consumerOptions.SubscriptionName, topic,
+ _processManager, initialChannel, executor, stateManager,
consumerChannelFactory);
+
+ if (_consumerOptions.StateChangedHandler is not null)
+ _ = StateMonitor.MonitorConsumer(subConsumer,
_consumerOptions.StateChangedHandler);
+
+ var process = new ConsumerProcess(correlationId, stateManager,
subConsumer, _consumerOptions.SubscriptionType == SubscriptionType.Failover);
+ _processManager.Add(process);
+ process.Start();
+ return subConsumer;
}
-
- private async ValueTask<IMessage<TMessage>>
InternalReceive(CancellationToken cancellationToken)
+ private string GetPartitionedTopicName(int partitionNumber)
Review Comment:
Spacing between methods
##########
tests/DotPulsar.Tests/ReaderTests.cs:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using FluentAssertions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection("Integration"), Trait("Category", "Integration")]
+public class ReaderTests
+{
+ private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
+
+ public ReaderTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
+ {
+ _fixture = fixture;
+ _testOutputHelper = testOutputHelper;
+ }
+
+ [Fact]
+ public async Task Receive_GivenTopicWithMessages_ShouldReceiveAll()
+ {
+ //Arrange
+ await using var client = CreateClient();
+ const int numberOfMessages = 10;
+ var topicName = $"simple-produce-consume{Guid.NewGuid():N}";
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .Create();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .Topic(topicName)
+ .Create();
+
+ var expected = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send("test-message");
+ expected.Add(messageId);
+ }
+
+ //Act
+ var actual = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await reader.Receive();
+ actual.Add(messageId.MessageId);
+ }
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReceiveAll()
+ {
+ //Arrange
+ const int partitions = 3;
+ const int numberOfMessages = 50;
+ var topicName = $"reader-with-3-partitions-test";
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = CreateClient();
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .Create();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .Topic(topicName)
+ .Create();
+
+ var expected = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send("test-message");
+ expected.Add(messageId);
+ }
+
+ //Act
+ var actual = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await reader.Receive();
+ actual.Add(messageId.MessageId);
+ }
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReturnMessagesFromDifferentPartitions()
+ {
+ //Arrange
+ const int partitions = 3;
+ const int numberOfMessages = 20;
+ var topicName = $"reader-should-read-from-different-partitions-test";
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = CreateClient();
+ const string content = "test-message";
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .Create();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .Topic(topicName)
+ .Create();
+
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ await producer.Send(content);
+ }
+
+ //Act
+ var messageIds = new List<MessageId>();
+ await foreach (var message in reader.Messages())
+ {
+ messageIds.Add(message.MessageId);
+
+ if (messageIds.Count != numberOfMessages)
+ continue;
+
+ break;
+ }
+
+ //Assert
+ var foundNonNegativeOne = false;
+ foreach (var messageId in messageIds)
+ {
+ if (!messageId.Partition.Equals(-1))
+ foundNonNegativeOne = true;
+ }
+
+ foundNonNegativeOne.Should().Be(true);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageIds_GivenTopicWithThreePartitions_ShouldHaveThreePartitions()
+ {
+ //Arrange
+ const int partitions = 3;
+ const int numberOfMessages = 6;
+ var topicName = $"reader_get_last_message_ids_should_have_3_topics";
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = CreateClient();
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .Create();
+
+ await using var reader = client.NewReader(Schema.String)
+ .StartMessageId(MessageId.Earliest)
+ .Topic(topicName)
+ .Create();
+
+ List<MessageId> expected = new List<MessageId>();
Review Comment:
var
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -70,6 +70,154 @@ public async Task
Messages_GivenTopicWithMessages_ShouldConsumeAll(int numberOfM
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ var topicName =
$"persistent://public/default/consumer-tests-{testRunId}";
+
+ _fixture.CreatePartitionedTopic(topicName, partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<NotSupportedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReceiveAll()
Review Comment:
Then I think we should name it so that we know it is a non-partitioned topic
we are testing
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -70,6 +69,115 @@ public async Task
Messages_GivenTopicWithMessages_ShouldConsumeAll(int numberOfM
consumed.Should().BeEquivalentTo(produced);
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ var topicName =
$"persistent://public/default/consumer-tests-{testRunId}";
+
+ _fixture.CreatePartitionedTopic(topicName, partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<NotSupportedException>();
+ }
+
+ [Fact]
+ public async Task
Receive_GivenPartitionedTopicWithMessages_ShouldReceiveAll()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ const int numberOfMessages = 10000;
+ var topicName = $"consumer-with-3-partitions-test";
+
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ await using var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ await using var producer = client.NewProducer(Schema.ByteArray)
+ .ProducerName($"producer-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+
+ //Act
+ var produced = await ProduceMessages(producer, numberOfMessages,
cts.Token);
+ var consumed = await ConsumeMessages(consumer, numberOfMessages,
cts.Token);
+
+ //Assert
+ consumed.Should().BeEquivalentTo(produced);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageIds_GivenMessageIdsFrom3Partitions_ShouldHave3Partitions()
+ {
+ //Arrange
+ var testRunId = Guid.NewGuid().ToString("N");
+ const int partitions = 3;
+ const int numberOfMessages = 6;
+ var topicName = $"consumer_get_last_message_ids_should_have_3_topics";
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(_fixture.ServiceUrl)
+ .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
+ .Build();
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topicName)
+ .Create();
+
+ await using var consumer = client.NewConsumer(Schema.ByteArray)
+ .ConsumerName($"consumer-{testRunId}")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .SubscriptionName($"subscription-{testRunId}")
+ .Topic(topicName)
+ .Create();
+
+ List<MessageId> expected = new List<MessageId>();
Review Comment:
var
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]