This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2717848 SubReader still attached topic name on a topic with no
messages. (#177)
2717848 is described below
commit 2717848d63ec72ca85a81c4d1a1895b678a76f9e
Author: entvex <[email protected]>
AuthorDate: Mon Sep 18 13:28:00 2023 +0200
SubReader still attached topic name on a topic with no messages. (#177)
Consumers and readers were attaching the topic name even when there were no
messages available. This issue has been resolved in
https://github.com/apache/pulsar-dotpulsar/pull/175 in the ConsumerChannel.
However, the SubReader code was still adding the topic unnecessarily. I have
now removed this code and added tests to ensure that both the reader and
consumer are functioning properly.
Co-authored-by: David Jensen <[email protected]>
---
CHANGELOG.md | 6 ++++++
src/DotPulsar/Internal/SubReader.cs | 3 +--
tests/DotPulsar.Tests/ConsumerTests.cs | 35 ++++++++++++++++++++++++++++++++++
tests/DotPulsar.Tests/ReaderTests.cs | 31 ++++++++++++++++++++++++++++++
4 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1247972..6f685ea 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this
file.
The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [Unreleased]
+
+### Fixed
+
+- When calling GetLastMessageId(s) on a SubReader code was still adding the
topic unnecessarily
+
## [3.0.1] - 2023-09-15
### Changed
diff --git a/src/DotPulsar/Internal/SubReader.cs
b/src/DotPulsar/Internal/SubReader.cs
index f0bfa31..65a6cd4 100644
--- a/src/DotPulsar/Internal/SubReader.cs
+++ b/src/DotPulsar/Internal/SubReader.cs
@@ -82,8 +82,7 @@ public sealed class SubReader<TMessage> : IContainsChannel,
IReader<TMessage>
private async ValueTask<MessageId>
InternalGetLastMessageId(CommandGetLastMessageId command, CancellationToken
cancellationToken)
{
Guard();
- var messageId = await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
- return new MessageId(messageId.LedgerId, messageId.EntryId,
messageId.Partition, messageId.BatchIndex, Topic);
+ return await _channel.Send(command,
cancellationToken).ConfigureAwait(false);
}
public async ValueTask<IMessage<TMessage>> Receive(CancellationToken
cancellationToken)
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs
b/tests/DotPulsar.Tests/ConsumerTests.cs
index 6460dbe..4b446e1 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -37,6 +37,23 @@ public class ConsumerTests
_testOutputHelper = testOutputHelper;
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
+ {
+ //Arrange
+ const string topicName =
$"consumer-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
+ const string subscriptionName = "subscription-given-given-empty-topic";
+ const string consumerName = $"consumer-given-empty-topic";
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+
+ //Act
+ var actual = await consumer.GetLastMessageId();
+
+ //Assert
+ actual.Should().BeEquivalentTo(MessageId.Earliest);
+ }
+
[Fact]
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{
@@ -152,6 +169,24 @@ public class ConsumerTests
actual.Should().BeEquivalentTo(expected);
}
+ [Fact]
+ public async Task
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
+ {
+ //Arrange
+ const string topicName =
$"consumer-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
+ const string subscriptionName = "subscription-given-given-empty-topic";
+ const string consumerName = $"consumer-given-empty-topic";
+ await using var client = CreateClient();
+ await using var consumer = CreateConsumer(client,
SubscriptionInitialPosition.Earliest, topicName, consumerName,
subscriptionName);
+ var expected = new List<MessageId>() { MessageId.Earliest };
+
+ //Act
+ var actual = await consumer.GetLastMessageIds();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
[Fact]
public async Task Receive_GivenNonPartitionedTopic_ShouldReceiveAll()
{
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs
b/tests/DotPulsar.Tests/ReaderTests.cs
index a2c546f..91e164e 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -37,6 +37,21 @@ public class ReaderTests
_testOutputHelper = testOutputHelper;
}
+ [Fact]
+ public async Task
GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
+ {
+ //Arrange
+ const string topicName =
$"reader-{nameof(GetLastMessageId_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
+ await using var client = CreateClient();
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+
+ //Act
+ var actual = await reader.GetLastMessageId();
+
+ //Assert
+ actual.Should().BeEquivalentTo(MessageId.Earliest);
+ }
+
[Fact]
public async Task
GetLastMessageId_GivenNonPartitionedTopic_ShouldGetMessageId()
{
@@ -84,6 +99,22 @@ public class ReaderTests
exception.Should().BeOfType<NotSupportedException>();
}
+ [Fact]
+ public async Task
GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest()
+ {
+ //Arrange
+ const string topicName =
$"reader-{nameof(GetLastMessageIds_GivenEmptyTopic_ShouldBeEqualToMessageIdEarliest)}";
+ await using var client = CreateClient();
+ await using var reader = CreateReader(client, MessageId.Earliest,
topicName);
+ var expected = new List<MessageId>() { MessageId.Earliest };
+
+ //Act
+ var actual = await reader.GetLastMessageIds();
+
+ //Assert
+ actual.Should().BeEquivalentTo(expected);
+ }
+
[Fact]
public async Task
GetLastMessageIds_GivenNonPartitionedTopic_ShouldGetMessageIdFromPartition()
{