This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0ef3ddf [feat] Support configure startMessageIdInclusive for the
reader (#291)
0ef3ddf is described below
commit 0ef3ddf743c3e473b14382acc5056d43cb5fef5b
Author: Zike Yang <[email protected]>
AuthorDate: Mon Jun 26 11:17:41 2023 +0800
[feat] Support configure startMessageIdInclusive for the reader (#291)
---
include/pulsar/ReaderConfiguration.h | 15 ++++++++++
lib/ReaderConfiguration.cc | 7 +++++
lib/ReaderConfigurationImpl.h | 1 +
lib/ReaderImpl.cc | 1 +
tests/ReaderTest.cc | 54 +++++++++++++++++++++++++++++++++++-
5 files changed, 77 insertions(+), 1 deletion(-)
diff --git a/include/pulsar/ReaderConfiguration.h
b/include/pulsar/ReaderConfiguration.h
index 4f6464f..f72abf6 100644
--- a/include/pulsar/ReaderConfiguration.h
+++ b/include/pulsar/ReaderConfiguration.h
@@ -263,6 +263,21 @@ class PULSAR_PUBLIC ReaderConfiguration {
*/
ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction
action);
+ /**
+ * Set the reader to include the startMessageId or given position of any
reset operation like
+ * Reader::seek.
+ *
+ * Default: false
+ *
+ * @param startMessageIdInclusive whether to include the reset position
+ */
+ ReaderConfiguration& setStartMessageIdInclusive(bool
startMessageIdInclusive);
+
+ /**
+ * The associated getter of setStartMessageIdInclusive
+ */
+ bool isStartMessageIdInclusive() const;
+
/**
* Check whether the message has a specific property attached.
*
diff --git a/lib/ReaderConfiguration.cc b/lib/ReaderConfiguration.cc
index 3ba7fed..f1dba5e 100644
--- a/lib/ReaderConfiguration.cc
+++ b/lib/ReaderConfiguration.cc
@@ -120,6 +120,13 @@ ReaderConfiguration&
ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoF
return *this;
}
+ReaderConfiguration& ReaderConfiguration::setStartMessageIdInclusive(bool
startMessageIdInclusive) {
+ impl_->startMessageIdInclusive = startMessageIdInclusive;
+ return *this;
+}
+
+bool ReaderConfiguration::isStartMessageIdInclusive() const { return
impl_->startMessageIdInclusive; }
+
bool ReaderConfiguration::hasProperty(const std::string& name) const {
const auto& properties = impl_->properties;
return properties.find(name) != properties.cend();
diff --git a/lib/ReaderConfigurationImpl.h b/lib/ReaderConfigurationImpl.h
index 6f38c29..c92397c 100644
--- a/lib/ReaderConfigurationImpl.h
+++ b/lib/ReaderConfigurationImpl.h
@@ -38,6 +38,7 @@ struct ReaderConfigurationImpl {
CryptoKeyReaderPtr cryptoKeyReader;
ConsumerCryptoFailureAction cryptoFailureAction;
std::map<std::string, std::string> properties;
+ bool startMessageIdInclusive{false};
};
} // namespace pulsar
#endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index d6a8f1d..f41106e 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -59,6 +59,7 @@ void ReaderImpl::start(const MessageId& startMessageId,
consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
consumerConf.setProperties(readerConf_.getProperties());
+
consumerConf.setStartMessageIdInclusive(readerConf_.isStartMessageIdInclusive());
if (readerConf_.getReaderName().length() > 0) {
consumerConf.setConsumerName(readerConf_.getReaderName());
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index afef384..ac2fa23 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -700,4 +700,56 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
client.close();
}
-INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
\ No newline at end of file
+TEST(ReaderSeekTest, testSeekForMessageId) {
+ Client client(serviceUrl);
+
+ const std::string topic = "test-seek-for-message-id-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ Reader readerExclusive;
+ ASSERT_EQ(ResultOk,
+ client.createReader(topic, MessageId::latest(),
ReaderConfiguration(), readerExclusive));
+
+ Reader readerInclusive;
+ ASSERT_EQ(ResultOk,
+ client.createReader(topic, MessageId::latest(),
+
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
+
+ const auto numMessages = 100;
+ MessageId seekMessageId;
+
+ int r = (rand() % (numMessages - 1));
+ for (int i = 0; i < numMessages; i++) {
+ MessageId id;
+ ASSERT_EQ(ResultOk,
+ producer.send(MessageBuilder().setContent("msg-" +
std::to_string(i)).build(), id));
+
+ if (i == r) {
+ seekMessageId = id;
+ }
+ }
+
+ LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r);
+
+ readerExclusive.seek(seekMessageId);
+ Message msg0;
+ ASSERT_EQ(ResultOk, readerExclusive.readNext(msg0, 3000));
+
+ readerInclusive.seek(seekMessageId);
+ Message msg1;
+ ASSERT_EQ(ResultOk, readerInclusive.readNext(msg1, 3000));
+
+ LOG_INFO("readerExclusive received " << msg0.getDataAsString() << " from "
<< msg0.getMessageId());
+ LOG_INFO("readerInclusive received " << msg1.getDataAsString() << " from "
<< msg1.getMessageId());
+
+ ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1));
+ ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r));
+
+ readerExclusive.close();
+ readerInclusive.close();
+ producer.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));