This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ef3ddf  [feat] Support configure startMessageIdInclusive for the 
reader (#291)
0ef3ddf is described below

commit 0ef3ddf743c3e473b14382acc5056d43cb5fef5b
Author: Zike Yang <[email protected]>
AuthorDate: Mon Jun 26 11:17:41 2023 +0800

    [feat] Support configure startMessageIdInclusive for the reader (#291)
---
 include/pulsar/ReaderConfiguration.h | 15 ++++++++++
 lib/ReaderConfiguration.cc           |  7 +++++
 lib/ReaderConfigurationImpl.h        |  1 +
 lib/ReaderImpl.cc                    |  1 +
 tests/ReaderTest.cc                  | 54 +++++++++++++++++++++++++++++++++++-
 5 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/include/pulsar/ReaderConfiguration.h 
b/include/pulsar/ReaderConfiguration.h
index 4f6464f..f72abf6 100644
--- a/include/pulsar/ReaderConfiguration.h
+++ b/include/pulsar/ReaderConfiguration.h
@@ -263,6 +263,21 @@ class PULSAR_PUBLIC ReaderConfiguration {
      */
     ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction 
action);
 
+    /**
+     * Set the reader to include the startMessageId or given position of any 
reset operation like
+     * Reader::seek.
+     *
+     * Default: false
+     *
+     * @param startMessageIdInclusive whether to include the reset position
+     */
+    ReaderConfiguration& setStartMessageIdInclusive(bool 
startMessageIdInclusive);
+
+    /**
+     * The associated getter of setStartMessageIdInclusive
+     */
+    bool isStartMessageIdInclusive() const;
+
     /**
      * Check whether the message has a specific property attached.
      *
diff --git a/lib/ReaderConfiguration.cc b/lib/ReaderConfiguration.cc
index 3ba7fed..f1dba5e 100644
--- a/lib/ReaderConfiguration.cc
+++ b/lib/ReaderConfiguration.cc
@@ -120,6 +120,13 @@ ReaderConfiguration& 
ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoF
     return *this;
 }
 
+ReaderConfiguration& ReaderConfiguration::setStartMessageIdInclusive(bool 
startMessageIdInclusive) {
+    impl_->startMessageIdInclusive = startMessageIdInclusive;
+    return *this;
+}
+
+bool ReaderConfiguration::isStartMessageIdInclusive() const { return 
impl_->startMessageIdInclusive; }
+
 bool ReaderConfiguration::hasProperty(const std::string& name) const {
     const auto& properties = impl_->properties;
     return properties.find(name) != properties.cend();
diff --git a/lib/ReaderConfigurationImpl.h b/lib/ReaderConfigurationImpl.h
index 6f38c29..c92397c 100644
--- a/lib/ReaderConfigurationImpl.h
+++ b/lib/ReaderConfigurationImpl.h
@@ -38,6 +38,7 @@ struct ReaderConfigurationImpl {
     CryptoKeyReaderPtr cryptoKeyReader;
     ConsumerCryptoFailureAction cryptoFailureAction;
     std::map<std::string, std::string> properties;
+    bool startMessageIdInclusive{false};
 };
 }  // namespace pulsar
 #endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index d6a8f1d..f41106e 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -59,6 +59,7 @@ void ReaderImpl::start(const MessageId& startMessageId,
     consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
     consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
     consumerConf.setProperties(readerConf_.getProperties());
+    
consumerConf.setStartMessageIdInclusive(readerConf_.isStartMessageIdInclusive());
 
     if (readerConf_.getReaderName().length() > 0) {
         consumerConf.setConsumerName(readerConf_.getReaderName());
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index afef384..ac2fa23 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -700,4 +700,56 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
     client.close();
 }
 
-INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
\ No newline at end of file
+TEST(ReaderSeekTest, testSeekForMessageId) {
+    Client client(serviceUrl);
+
+    const std::string topic = "test-seek-for-message-id-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    Reader readerExclusive;
+    ASSERT_EQ(ResultOk,
+              client.createReader(topic, MessageId::latest(), 
ReaderConfiguration(), readerExclusive));
+
+    Reader readerInclusive;
+    ASSERT_EQ(ResultOk,
+              client.createReader(topic, MessageId::latest(),
+                                  
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
+
+    const auto numMessages = 100;
+    MessageId seekMessageId;
+
+    int r = (rand() % (numMessages - 1));
+    for (int i = 0; i < numMessages; i++) {
+        MessageId id;
+        ASSERT_EQ(ResultOk,
+                  producer.send(MessageBuilder().setContent("msg-" + 
std::to_string(i)).build(), id));
+
+        if (i == r) {
+            seekMessageId = id;
+        }
+    }
+
+    LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r);
+
+    readerExclusive.seek(seekMessageId);
+    Message msg0;
+    ASSERT_EQ(ResultOk, readerExclusive.readNext(msg0, 3000));
+
+    readerInclusive.seek(seekMessageId);
+    Message msg1;
+    ASSERT_EQ(ResultOk, readerInclusive.readNext(msg1, 3000));
+
+    LOG_INFO("readerExclusive received " << msg0.getDataAsString() << " from " 
<< msg0.getMessageId());
+    LOG_INFO("readerInclusive received " << msg1.getDataAsString() << " from " 
<< msg1.getMessageId());
+
+    ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1));
+    ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r));
+
+    readerExclusive.close();
+    readerInclusive.close();
+    producer.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));

Reply via email to