This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a1cf401 Fix `StartMessageIdInclusive` not work when reader reads from
latest msg id (#386)
a1cf401 is described below
commit a1cf401ec0dd423871537559050396a2b001065d
Author: Zike Yang <[email protected]>
AuthorDate: Mon Jan 15 15:11:23 2024 +0800
Fix `StartMessageIdInclusive` not work when reader reads from latest msg id
(#386)
Fixes #385
### Motivation
The reader with `StartMessageIdInclusive` enabled should be able to reads
messages from the latest message ID.
### Modifications
- If `StartMessageIdInclusive` is enabled, the reader will seek and read
the latest message in the topic.
---
lib/ConsumerImpl.cc | 30 +++++++++++++++++++++++-------
tests/ReaderTest.cc | 32 ++++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 7 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5216218..dbd3b65 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1508,18 +1508,34 @@ void
ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
if (messageId == MessageId::latest()) {
lock.unlock();
- getLastMessageIdAsync([callback](Result result, const
GetLastMessageIdResponse& response) {
+ auto self = get_shared_this_ptr();
+ getLastMessageIdAsync([self, callback](Result result, const
GetLastMessageIdResponse& response) {
if (result != ResultOk) {
callback(result, {});
return;
}
- if (response.hasMarkDeletePosition() &&
response.getLastMessageId().entryId() >= 0) {
- // We only care about comparing ledger ids and entry ids as
mark delete position doesn't have
- // other ids such as batch index
- callback(ResultOk,
compareLedgerAndEntryId(response.getMarkDeletePosition(),
-
response.getLastMessageId()) < 0);
+ auto handleResponse = [self, response, callback] {
+ if (response.hasMarkDeletePosition() &&
response.getLastMessageId().entryId() >= 0) {
+ // We only care about comparing ledger ids and entry ids
as mark delete position doesn't
+ // have other ids such as batch index
+ auto compareResult =
compareLedgerAndEntryId(response.getMarkDeletePosition(),
+
response.getLastMessageId());
+ callback(ResultOk,
self->config_.isStartMessageIdInclusive() ? compareResult <= 0
+
: compareResult < 0);
+ } else {
+ callback(ResultOk, false);
+ }
+ };
+ if (self->config_.isStartMessageIdInclusive()) {
+ self->seekAsync(response.getLastMessageId(), [callback,
handleResponse](Result result) {
+ if (result != ResultOk) {
+ callback(result, {});
+ return;
+ }
+ handleResponse();
+ });
} else {
- callback(ResultOk, false);
+ handleResponse();
}
});
} else {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index ac2fa23..723972d 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -752,4 +752,36 @@ TEST(ReaderSeekTest, testSeekForMessageId) {
producer.close();
}
+TEST(ReaderSeekTest, testStartAtLatestMessageId) {
+ Client client(serviceUrl);
+
+ const std::string topic = "test-seek-latest-message-id-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ MessageId id;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg").build(), id));
+
+ Reader readerExclusive;
+ ASSERT_EQ(ResultOk,
+ client.createReader(topic, MessageId::latest(),
ReaderConfiguration(), readerExclusive));
+
+ Reader readerInclusive;
+ ASSERT_EQ(ResultOk,
+ client.createReader(topic, MessageId::latest(),
+
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
+
+ Message msg;
+ bool hasMsgAvaliable = false;
+ readerInclusive.hasMessageAvailable(hasMsgAvaliable);
+ ASSERT_TRUE(hasMsgAvaliable);
+ ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
+ ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
+
+ readerExclusive.close();
+ readerInclusive.close();
+ producer.close();
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));