blankensteiner commented on code in PR #162:
URL: https://github.com/apache/pulsar-dotpulsar/pull/162#discussion_r1295745279
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -19,71 +19,202 @@ namespace DotPulsar.Tests;
using FluentAssertions;
using System;
using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
+using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public class ConsumerTests
{
private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
- public ConsumerTests(IntegrationFixture fixture)
+ public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
{
_fixture = fixture;
+ _testOutputHelper = testOutputHelper;
}
- [Theory]
- [InlineData(10000)]
- public async Task Messages_GivenTopicWithMessages_ShouldConsumeAll(int
numberOfMessages)
+ [Fact]
+ public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var testRunId = Guid.NewGuid().ToString("N");
+ var topicName =
"persistent://public/default/consumer-getlastmessageid-given-non-partitioned-topic";
Review Comment:
const?
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -19,71 +19,202 @@ namespace DotPulsar.Tests;
using FluentAssertions;
using System;
using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
+using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public class ConsumerTests
{
private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
- public ConsumerTests(IntegrationFixture fixture)
+ public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
{
_fixture = fixture;
+ _testOutputHelper = testOutputHelper;
}
- [Theory]
- [InlineData(10000)]
- public async Task Messages_GivenTopicWithMessages_ShouldConsumeAll(int
numberOfMessages)
+ [Fact]
+ public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var testRunId = Guid.NewGuid().ToString("N");
+ var topicName =
"persistent://public/default/consumer-getlastmessageid-given-non-partitioned-topic";
+ const string subscriptionName =
"subscription-given-given-partitioned-topic";
+ const string consumerName = $"consumer-given-partitioned-topic";
+ const string content = "test-message";
+ const int numberOfMessages = 6;
- var topic = $"persistent://public/default/consumer-tests-{testRunId}";
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var producer = CreateProducer(client, topicName);
- await using var client = PulsarClient.Builder()
- .ServiceUrl(_fixture.ServiceUrl)
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .Build();
+ MessageId expected = null!;
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send(content);
+ if (i >= 5)
+ {
+ expected = messageId;
+ }
+ }
- await using var consumer = client.NewConsumer(Schema.ByteArray)
- .ConsumerName($"consumer-{testRunId}")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName($"subscription-{testRunId}")
- .Topic(topic)
- .Create();
+ //Act
+ var actual = await consumer.GetLastMessageId();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/consumer-getlastmessageid-given-partitioned-topic";
Review Comment:
const?
##########
tests/DotPulsar.Tests/ConsumerTests.cs:
##########
@@ -19,71 +19,202 @@ namespace DotPulsar.Tests;
using FluentAssertions;
using System;
using System.Collections.Generic;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
+using Xunit.Abstractions;
[Collection("Integration"), Trait("Category", "Integration")]
public class ConsumerTests
{
private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
- public ConsumerTests(IntegrationFixture fixture)
+ public ConsumerTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
{
_fixture = fixture;
+ _testOutputHelper = testOutputHelper;
}
- [Theory]
- [InlineData(10000)]
- public async Task Messages_GivenTopicWithMessages_ShouldConsumeAll(int
numberOfMessages)
+ [Fact]
+ public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
//Arrange
- var testRunId = Guid.NewGuid().ToString("N");
+ var topicName =
"persistent://public/default/consumer-getlastmessageid-given-non-partitioned-topic";
+ const string subscriptionName =
"subscription-given-given-partitioned-topic";
+ const string consumerName = $"consumer-given-partitioned-topic";
+ const string content = "test-message";
+ const int numberOfMessages = 6;
- var topic = $"persistent://public/default/consumer-tests-{testRunId}";
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var producer = CreateProducer(client, topicName);
- await using var client = PulsarClient.Builder()
- .ServiceUrl(_fixture.ServiceUrl)
- .Authentication(AuthenticationFactory.Token(ct =>
ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan))))
- .Build();
+ MessageId expected = null!;
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send(content);
+ if (i >= 5)
+ {
+ expected = messageId;
+ }
+ }
- await using var consumer = client.NewConsumer(Schema.ByteArray)
- .ConsumerName($"consumer-{testRunId}")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .SubscriptionName($"subscription-{testRunId}")
- .Topic(topic)
- .Create();
+ //Act
+ var actual = await consumer.GetLastMessageId();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/consumer-getlastmessageid-given-partitioned-topic";
+ const string subscriptionName =
"subscription-given-given-partitioned-topic";
+ const string consumerName = "consumer-given-partitioned-topic";
+ const int partitions = 3;
+ _fixture.CreatePartitionedTopic(topicName, partitions);
+
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+
+ //Act
+ var exception = await Record.ExceptionAsync(() =>
consumer.GetLastMessageId().AsTask());
+
+ //Assert
+ exception.Should().BeOfType<NotSupportedException>();
+ }
+
+ [Fact]
+ public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
+ {
+ //Arrange
+ const string topicName =
"consumer-getlastmessageids-given-non-partitioned-topic";
+ const string subscriptionName = "subscription-should_have-3-topics";
+ const string consumerName = "consumer-should_have-3-topics";
+ const string content = "test-message";
+ const int numberOfMessages = 6;
+
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var producer = CreateProducer(client, topicName);
+
+ var expected = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send(content);
+ if (i >= 5)
+ {
+ expected.Add(messageId);
+ }
+ }
+
+ //Act
+ var actual = await consumer.GetLastMessageIds();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageIds_GivenPartitionedTopic_ShouldGetMessageIdFromAllPartitions()
+ {
+ //Arrange
+ const string topicName =
"consumer-getlastmessageids-given-partitioned-topic";
+ const string subscriptionName = "subscription-should_have-3-topics";
+ const string consumerName = "consumer-should_have-3-topics";
+ const string content = "test-message";
+ const int numberOfMessages = 6;
+ const int partitions = 3;
+
_fixture.CreatePartitionedTopic($"persistent://public/default/{topicName}",
partitions);
- await using var producer = client.NewProducer(Schema.ByteArray)
- .ProducerName($"producer-{testRunId}")
- .Topic(topic)
- .Create();
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ await using var producer = CreateProducer(client, topicName);
+
+ var expected = new List<MessageId>();
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send(content);
+ if (i >= 3)
+ {
+ expected.Add(messageId);
+ }
+ }
+
+ //Act
+ var actual = await consumer.GetLastMessageIds();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/consumer-given-topic-with-messages";
Review Comment:
const?
##########
tests/DotPulsar.Tests/ReaderTests.cs:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using FluentAssertions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection("Integration"), Trait("Category", "Integration")]
+public class ReaderTests
+{
+ private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
+
+ public ReaderTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
+ {
+ _fixture = fixture;
+ _testOutputHelper = testOutputHelper;
+ }
+
+ [Fact]
+ public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/reader-non-partitioned-topic-get-last-MessageId";
+ const string content = "test-message";
+ const int numberOfMessages = 6;
+
+ await using var client = CreateClient();
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ await using var producer = CreateProducer(client, topicName);
+
+ MessageId expected = null!;
+ for (var i = 0; i < numberOfMessages; i++)
+ {
+ var messageId = await producer.Send(content);
+ if (i >= 5)
+ {
+ expected = messageId;
+ }
+ }
+
+ //Act
+ var actual = await reader.GetLastMessageId();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
+ [Fact]
+ public async Task
GetLastMessageId_GivenPartitionedTopic_ShouldThrowException()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/reader-partitioned-topic-get-last-MessageId";
Review Comment:
const?
##########
tests/DotPulsar.Tests/ReaderTests.cs:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests;
+
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using FluentAssertions;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Xunit.Abstractions;
+
+[Collection("Integration"), Trait("Category", "Integration")]
+public class ReaderTests
+{
+ private readonly IntegrationFixture _fixture;
+ private readonly ITestOutputHelper _testOutputHelper;
+
+ public ReaderTests(IntegrationFixture fixture, ITestOutputHelper
testOutputHelper)
+ {
+ _fixture = fixture;
+ _testOutputHelper = testOutputHelper;
+ }
+
+ [Fact]
+ public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
+ {
+ //Arrange
+ var topicName =
"persistent://public/default/reader-non-partitioned-topic-get-last-MessageId";
Review Comment:
const?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]