This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e4c74671f0 [feature][client-cpp] Support inclusive seek for cpp 
client (#17209)
7e4c74671f0 is described below

commit 7e4c74671f0f13f0a5d860d19655f09c0a351f58
Author: Kai Wang <[email protected]>
AuthorDate: Wed Sep 28 10:08:24 2022 +0800

    [feature][client-cpp] Support inclusive seek for cpp client (#17209)
    
    Fixes #17186
    
    ### Motivation
    
    There are some cases in which it is useful to be able to include current
    position of the message when reset of cursor was made.
    
    ### Modifications
    
    * Support inclusive seek in c++ consumers.
    * Add a unit test to verify.
---
 .../include/pulsar/ConsumerConfiguration.h         |  14 ++
 pulsar-client-cpp/lib/ClientImpl.cc                |   4 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   7 +
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   1 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 165 +++++++++++++--------
 pulsar-client-cpp/lib/ConsumerImpl.h               |  16 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |   9 +-
 pulsar-client-cpp/lib/ReaderImpl.cc                |   7 +-
 pulsar-client-cpp/lib/Synchronized.h               |  42 ++++++
 pulsar-client-cpp/python/pulsar_test.py            |   8 +-
 pulsar-client-cpp/tests/ConsumerTest.cc            |  63 ++++++++
 11 files changed, 257 insertions(+), 79 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index b326ca8fb31..4347c3b2d5f 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -499,6 +499,20 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     bool isAutoAckOldestChunkedMessageOnQueueFull() const;
 
+    /**
+     * Set the consumer to include the given position of any reset operation 
like Consumer::seek.
+     *
+     * Default: false
+     *
+     * @param startMessageIdInclusive whether to include the reset position
+     */
+    ConsumerConfiguration& setStartMessageIdInclusive(bool 
startMessageIdInclusive);
+
+    /**
+     * The associated getter of setStartMessageIdInclusive
+     */
+    bool isStartMessageIdInclusive() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 08adb1d6423..29e92f3b815 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -378,8 +378,8 @@ void ClientImpl::handleSubscribe(const Result result, const 
LookupDataResultPtr
                                                                  
partitionMetadata->getPartitions(),
                                                                  
subscriptionName, conf, lookupServicePtr_);
         } else {
-            auto consumerImpl = 
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
-                                                               
subscriptionName, conf);
+            auto consumerImpl = std::make_shared<ConsumerImpl>(
+                shared_from_this(), topicName->toString(), subscriptionName, 
conf, topicName->isPersistent());
             consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
             consumer = consumerImpl;
         }
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 2b58835cdbe..f9fe499b954 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -260,4 +260,11 @@ bool 
ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
     return impl_->autoAckOldestChunkedMessageOnQueueFull;
 }
 
+ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool 
startMessageIdInclusive) {
+    impl_->startMessageIdInclusive = startMessageIdInclusive;
+    return *this;
+}
+
+bool ConsumerConfiguration::isStartMessageIdInclusive() const { return 
impl_->startMessageIdInclusive; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 1c13f729b55..cca83a38829 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -53,6 +53,7 @@ struct ConsumerConfigurationImpl {
     KeySharedPolicy keySharedPolicy;
     size_t maxPendingChunkedMessage{10};
     bool autoAckOldestChunkedMessageOnQueueFull{false};
+    bool startMessageIdInclusive{false};
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 79c20d84649..37fcd95248a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
 
 ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& 
topic,
                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
+                           bool isPersistent,
                            const ExecutorServicePtr listenerExecutor /* = NULL 
by default */,
                            bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = 
NonPartitioned by default */,
@@ -47,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       config_(conf),
       subscription_(subscriptionName),
       originalSubscriptionName_(subscriptionName),
+      isPersistent_(isPersistent),
       messageListener_(config_.getMessageListener()),
       eventListener_(config_.getConsumerEventListener()),
       hasParent_(hasParent),
@@ -169,14 +171,17 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     // sending the subscribe request.
     cnx->registerConsumer(consumerId_, shared_from_this());
 
-    Lock lockForMessageId(mutexForMessageId_);
-    Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
-    if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
-        // Update startMessageId so that we can discard messages after delivery
-        // restarts
-        startMessageId_ = firstMessageInQueue;
+    if (duringSeek_) {
+        ackGroupingTrackerPtr_->flushAndClean();
     }
-    const auto startMessageId = startMessageId_;
+
+    Lock lockForMessageId(mutexForMessageId_);
+    // Update startMessageId so that we can discard messages after delivery 
restarts
+    const auto startMessageId = clearReceiveQueue();
+    const auto subscribeMessageId = (subscriptionMode_ == 
Commands::SubscriptionModeNonDurable)
+                                        ? startMessageId
+                                        : Optional<MessageId>::empty();
+    startMessageId_ = startMessageId;
     lockForMessageId.unlock();
 
     unAckedMessageTrackerPtr_->clear();
@@ -186,7 +191,7 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
         topic_, subscription_, consumerId_, requestId, getSubType(), 
consumerName_, subscriptionMode_,
-        startMessageId, readCompacted_, config_.getProperties(), 
config_.getSubscriptionProperties(),
+        subscribeMessageId, readCompacted_, config_.getProperties(), 
config_.getSubscriptionProperties(),
         config_.getSchema(), getInitialPosition(), 
config_.isReplicateSubscriptionStateEnabled(),
         config_.getKeySharedPolicy(), config_.getPriorityLevel());
     cnx->sendRequestWithId(cmd, requestId)
@@ -397,12 +402,12 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         return;
     }
 
-    const bool isMessageDecryptable =
-        metadata.encryption_keys_size() <= 0 || 
config_.getCryptoKeyReader().get() ||
+    const bool isMessageUndecryptable =
+        metadata.encryption_keys_size() > 0 && 
!config_.getCryptoKeyReader().get() &&
         config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::CONSUME;
 
     const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
-    if (isMessageDecryptable && !isChunkedMessage) {
+    if (!isMessageUndecryptable && !isChunkedMessage) {
         if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, 
payload, true)) {
             // Message was discarded on decompression error
             return;
@@ -446,6 +451,16 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         Lock lock(mutex_);
         numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, 
msg.redelivery_count());
     } else {
+        const auto startMessageId = startMessageId_.get();
+        if (isPersistent_ && startMessageId.is_present() &&
+            m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
+            m.getMessageId().entryId() == startMessageId.value().entryId() &&
+            isPriorEntryIndex(m.getMessageId().entryId())) {
+            LOG_DEBUG(getName() << " Ignoring message from before the 
startMessageId: "
+                                << startMessageId.value());
+            return;
+        }
+
         Lock lock(pendingReceiveMutex_);
         // if asyncReceive is waiting then notify callback without adding to 
incomingMessages queue
         bool asyncReceivedWaiting = !pendingReceives_.empty();
@@ -533,9 +548,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     batchAcknowledgementTracker_.receivedMessage(batchedMessage);
     LOG_DEBUG("Received Batch messages of size - " << batchSize
                                                    << " -- msgId: " << 
batchedMessage.getMessageId());
-    Lock lock(mutexForMessageId_);
-    const auto startMessageId = startMessageId_;
-    lock.unlock();
+    const auto startMessageId = startMessageId_.get();
 
     int skippedMessages = 0;
 
@@ -550,9 +563,9 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
 
             // If we are receiving a batch message, we need to discard 
messages that were prior
             // to the startMessageId
-            if (msgId.ledgerId() == startMessageId.value().ledgerId() &&
+            if (isPersistent_ && msgId.ledgerId() == 
startMessageId.value().ledgerId() &&
                 msgId.entryId() == startMessageId.value().entryId() &&
-                msgId.batchIndex() <= startMessageId.value().batchIndex()) {
+                isPriorBatchIndex(msgId.batchIndex())) {
                 LOG_DEBUG(getName() << "Ignoring message from before the 
startMessageId"
                                     << msg.getMessageId());
                 ++skippedMessages;
@@ -842,6 +855,12 @@ void ConsumerImpl::messageProcessed(Message& msg, bool 
track) {
  * not seen by the application
  */
 Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
+    bool expectedDuringSeek = true;
+    if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
+        return Optional<MessageId>::of(seekMessageId_.get());
+    } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
+        return startMessageId_.get();
+    }
     Message nextMessageInQueue;
     if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
         // There was at least one message pending in the queue
@@ -862,7 +881,7 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
     } else {
         // No message was received or dequeued by this consumer. Next message 
would still be the
         // startMessageId
-        return startMessageId_;
+        return startMessageId_.get();
     }
 }
 
@@ -1175,18 +1194,6 @@ void ConsumerImpl::brokerConsumerStatsListener(Result 
res, BrokerConsumerStatsIm
     }
 }
 
-void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
-    if (result == ResultOk) {
-        Lock lock(mutexForMessageId_);
-        lastDequedMessageId_ = MessageId::earliest();
-        lock.unlock();
-        LOG_INFO(getName() << "Seek successfully");
-    } else {
-        LOG_ERROR(getName() << "Failed to seek: " << strResult(result));
-    }
-    callback(result);
-}
-
 void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
     const auto state = state_.load();
     if (state == Closed || state == Closing) {
@@ -1197,25 +1204,13 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, 
ResultCallback callback) {
         return;
     }
 
-    this->ackGroupingTrackerPtr_->flushAndClean();
-    ClientConnectionPtr cnx = getCnx().lock();
-    if (cnx) {
-        ClientImplPtr client = client_.lock();
-        uint64_t requestId = client->newRequestId();
-        LOG_DEBUG(getName() << " Sending seek Command for Consumer - " << 
getConsumerId() << ", requestId - "
-                            << requestId);
-        Future<Result, ResponseData> future =
-            cnx->sendRequestWithId(Commands::newSeek(consumerId_, requestId, 
msgId), requestId);
-
-        if (callback) {
-            future.addListener(
-                std::bind(&ConsumerImpl::handleSeek, shared_from_this(), 
std::placeholders::_1, callback));
-        }
+    ClientImplPtr client = client_.lock();
+    if (!client) {
+        LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId);
         return;
     }
-
-    LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-    callback(ResultNotConnected);
+    const auto requestId = client->newRequestId();
+    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
msgId), msgId, 0L, callback);
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1228,24 +1223,14 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, 
ResultCallback callback) {
         return;
     }
 
-    ClientConnectionPtr cnx = getCnx().lock();
-    if (cnx) {
-        ClientImplPtr client = client_.lock();
-        uint64_t requestId = client->newRequestId();
-        LOG_DEBUG(getName() << " Sending seek Command for Consumer - " << 
getConsumerId() << ", requestId - "
-                            << requestId);
-        Future<Result, ResponseData> future =
-            cnx->sendRequestWithId(Commands::newSeek(consumerId_, requestId, 
timestamp), requestId);
-
-        if (callback) {
-            future.addListener(
-                std::bind(&ConsumerImpl::handleSeek, shared_from_this(), 
std::placeholders::_1, callback));
-        }
+    ClientImplPtr client = client_.lock();
+    if (!client) {
+        LOG_ERROR(getName() << "Client is expired when seekAsync " << 
timestamp);
         return;
     }
-
-    LOG_ERROR(getName() << " Client Connection not ready for Consumer");
-    callback(ResultNotConnected);
+    const auto requestId = client->newRequestId();
+    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, 
timestamp), MessageId::earliest(),
+                      timestamp, callback);
 }
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1255,9 +1240,10 @@ inline bool hasMoreMessages(const MessageId& 
lastMessageIdInBroker, const Messag
 }
 
 void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback 
callback) {
+    const auto startMessageId = startMessageId_.get();
     Lock lock(mutexForMessageId_);
     const auto messageId =
-        (lastDequedMessageId_ == MessageId::earliest()) ? 
startMessageId_.value() : lastDequedMessageId_;
+        (lastDequedMessageId_ == MessageId::earliest()) ? 
startMessageId.value() : lastDequedMessageId_;
 
     if (messageId == MessageId::latest()) {
         lock.unlock();
@@ -1380,4 +1366,57 @@ bool ConsumerImpl::isConnected() const { return 
!getCnx().expired() && state_ ==
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 
1 : 0; }
 
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const 
MessageId& seekId,
+                                     long timestamp, ResultCallback callback) {
+    ClientConnectionPtr cnx = getCnx().lock();
+    if (!cnx) {
+        LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+        callback(ResultNotConnected);
+        return;
+    }
+
+    const auto originalSeekMessageId = seekMessageId_.get();
+    seekMessageId_ = seekId;
+    duringSeek_ = true;
+    if (timestamp > 0) {
+        LOG_INFO(getName() << " Seeking subscription to " << timestamp);
+    } else {
+        LOG_INFO(getName() << " Seeking subscription to " << seekId);
+    }
+
+    std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+
+    cnx->sendRequestWithId(seek, requestId)
+        .addListener([this, weakSelf, callback, originalSeekMessageId](Result 
result,
+                                                                       const 
ResponseData& responseData) {
+            auto self = weakSelf.lock();
+            if (!self) {
+                callback(result);
+                return;
+            }
+            if (result == ResultOk) {
+                LOG_INFO(getName() << "Seek successfully");
+                ackGroupingTrackerPtr_->flushAndClean();
+                Lock lock(mutexForMessageId_);
+                lastDequedMessageId_ = MessageId::earliest();
+                lock.unlock();
+            } else {
+                LOG_ERROR(getName() << "Failed to seek: " << result);
+                seekMessageId_ = originalSeekMessageId;
+                duringSeek_ = false;
+            }
+            callback(result);
+        });
+}
+
+bool ConsumerImpl::isPriorBatchIndex(int32_t idx) {
+    return config_.isStartMessageIdInclusive() ? idx < 
startMessageId_.get().value().batchIndex()
+                                               : idx <= 
startMessageId_.get().value().batchIndex();
+}
+
+bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
+    return config_.isStartMessageIdInclusive() ? idx < 
startMessageId_.get().value().entryId()
+                                               : idx <= 
startMessageId_.get().value().entryId();
+}
+
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 70fda0170cc..1ad3a4c3727 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -46,6 +46,7 @@
 #include <lib/stats/ConsumerStatsDisabled.h>
 #include <queue>
 #include <atomic>
+#include "Synchronized.h"
 
 using namespace pulsar;
 
@@ -69,7 +70,7 @@ class ConsumerImpl : public ConsumerImplBase,
                      public std::enable_shared_from_this<ConsumerImpl> {
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const 
std::string& subscriptionName,
-                 const ConsumerConfiguration&,
+                 const ConsumerConfiguration&, bool isPersistent,
                  const ExecutorServicePtr listenerExecutor = 
ExecutorServicePtr(), bool hasParent = false,
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
@@ -138,7 +139,6 @@ class ConsumerImpl : public ConsumerImplBase,
 
     virtual void redeliverMessages(const std::set<MessageId>& messageIds);
 
-    void handleSeek(Result result, ResultCallback callback);
     virtual bool isReadCompacted();
     virtual void hasMessageAvailableAsync(HasMessageAvailableCallback 
callback);
     virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback 
callback);
@@ -169,6 +169,8 @@ class ConsumerImpl : public ConsumerImplBase,
     void drainIncomingMessageQueue(size_t count);
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& 
cnx, Message& batchedMessage,
                                                 int redeliveryCount);
+    bool isPriorBatchIndex(int32_t idx);
+    bool isPriorEntryIndex(int64_t idx);
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, 
BrokerConsumerStatsCallback);
 
     bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
@@ -187,11 +189,14 @@ class ConsumerImpl : public ConsumerImplBase,
                                        BrokerGetLastMessageIdCallback 
callback);
 
     Optional<MessageId> clearReceiveQueue();
+    void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& 
seekId, long timestamp,
+                           ResultCallback callback);
 
     std::mutex mutexForReceiveWithZeroQueueSize;
     const ConsumerConfiguration config_;
     const std::string subscription_;
     std::string originalSubscriptionName_;
+    const bool isPersistent_;
     MessageListener messageListener_;
     ConsumerEventListenerPtr eventListener_;
     ExecutorServicePtr listenerExecutor_;
@@ -220,12 +225,15 @@ class ConsumerImpl : public ConsumerImplBase,
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
 
-    // Make the access to `startMessageId_`, `lastDequedMessageId_` and 
`lastMessageIdInBroker_` thread safe
+    // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_` 
thread safe
     mutable std::mutex mutexForMessageId_;
-    Optional<MessageId> startMessageId_;
     MessageId lastDequedMessageId_{MessageId::earliest()};
     MessageId lastMessageIdInBroker_{MessageId::earliest()};
 
+    std::atomic_bool duringSeek_{false};
+    Synchronized<Optional<MessageId>> 
startMessageId_{Optional<MessageId>::empty()};
+    Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
+
     class ChunkedMessageCtx {
        public:
         ChunkedMessageCtx() : totalChunks_(0) {}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 75150762345..0d730e1561f 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -183,7 +183,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int 
numPartitions, TopicN
     if (numPartitions == 0) {
         // We don't have to add partition-n suffix
         consumer = std::make_shared<ConsumerImpl>(client_, 
topicName->toString(), subscriptionName_, config,
-                                                  internalListenerExecutor, 
true, NonPartitioned);
+                                                  topicName->isPersistent(), 
internalListenerExecutor, true,
+                                                  NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
             &MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(), std::placeholders::_1,
             std::placeholders::_2, partitionsNeedCreate, 
topicSubResultPromise));
@@ -195,7 +196,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int 
numPartitions, TopicN
         for (int i = 0; i < numPartitions; i++) {
             std::string topicPartitionName = 
topicName->getTopicPartitionName(i);
             consumer = std::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
-                                                      
internalListenerExecutor, true, Partitioned);
+                                                      
topicName->isPersistent(), internalListenerExecutor,
+                                                      true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
                 &MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(),
                 std::placeholders::_1, std::placeholders::_2, 
partitionsNeedCreate, topicSubResultPromise));
@@ -819,7 +821,8 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
     std::string topicPartitionName = 
topicName->getTopicPartitionName(partitionIndex);
 
     auto consumer = std::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
-                                                   internalListenerExecutor, 
true, Partitioned);
+                                                   topicName->isPersistent(), 
internalListenerExecutor, true,
+                                                   Partitioned);
     consumer->getConsumerCreatedFuture().addListener(
         std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(),
                   std::placeholders::_1, std::placeholders::_2, 
partitionsNeedCreate, topicSubResultPromise));
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 5f78068228f..83fa6a57009 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -76,9 +76,10 @@ void ReaderImpl::start(const MessageId& startMessageId,
         test::consumerConfigOfReader = consumerConf.clone();
     }
 
-    consumer_ = std::make_shared<ConsumerImpl>(
-        client_.lock(), topic_, subscription, consumerConf, 
ExecutorServicePtr(), false, NonPartitioned,
-        Commands::SubscriptionModeNonDurable, 
Optional<MessageId>::of(startMessageId));
+    consumer_ = std::make_shared<ConsumerImpl>(client_.lock(), topic_, 
subscription, consumerConf,
+                                               
TopicName::get(topic_)->isPersistent(), ExecutorServicePtr(),
+                                               false, NonPartitioned, 
Commands::SubscriptionModeNonDurable,
+                                               
Optional<MessageId>::of(startMessageId));
     consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
     auto self = shared_from_this();
     consumer_->getConsumerCreatedFuture().addListener(
diff --git a/pulsar-client-cpp/lib/Synchronized.h 
b/pulsar-client-cpp/lib/Synchronized.h
new file mode 100644
index 00000000000..a98c08daeee
--- /dev/null
+++ b/pulsar-client-cpp/lib/Synchronized.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <mutex>
+
+template <typename T>
+class Synchronized {
+   public:
+    explicit Synchronized(const T& value) : value_(value) {}
+
+    T get() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return value_;
+    }
+
+    Synchronized& operator=(const T& value) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        value_ = value;
+        return *this;
+    }
+
+   private:
+    T value_;
+    mutable std::mutex mutex_;
+};
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 5f46edc4365..375afe43adb 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -546,7 +546,7 @@ class PulsarTest(TestCase):
         # The reset would be effectively done on the next position relative to 
reset.
         # When available, we should test this behaviour with 
`startMessageIdInclusive` opt.
         from_msg_idx = last_msg_idx
-        for i in range(from_msg_idx, num_of_msgs):
+        for i in range(from_msg_idx + 1, num_of_msgs):
             msg = reader2.read_next(TM)
             self.assertTrue(msg)
             self.assertEqual(msg.data(), b"hello-%d" % i)
@@ -896,7 +896,7 @@ class PulsarTest(TestCase):
         consumer.seek(ids[50])
         time.sleep(0.5)
         msg = consumer.receive(TM)
-        self.assertEqual(msg.data(), b"hello-50")
+        self.assertEqual(msg.data(), b"hello-51")
 
         # ditto, but seek on timestamp
         consumer.seek(timestamps[42])
@@ -921,9 +921,9 @@ class PulsarTest(TestCase):
         reader.seek(ids[33])
         time.sleep(0.5)
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b"hello-33")
-        msg = reader.read_next(TM)
         self.assertEqual(msg.data(), b"hello-34")
+        msg = reader.read_next(TM)
+        self.assertEqual(msg.data(), b"hello-35")
 
         # seek on timestamp
         reader.seek(timestamps[79])
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index cf9ac23d190..c8a07e6c84b 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -868,4 +868,67 @@ TEST(ConsumerTest, 
testGetLastMessageIdBlockWhenConnectionDisconnected) {
     ASSERT_GE(elapsed.seconds(), operationTimeout);
 }
 
+class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
+   public:
+    void SetUp() override { producerConf_ = 
ProducerConfiguration().setBatchingEnabled(GetParam()); }
+
+    void TearDown() override { client_.close(); }
+
+   protected:
+    Client client_{lookupUrl};
+    ProducerConfiguration producerConf_;
+};
+
+TEST_P(ConsumerSeekTest, testSeekForMessageId) {
+    Client client(lookupUrl);
+
+    const std::string topic = "test-seek-for-message-id-" + 
std::string((GetParam() ? "batch-" : "")) +
+                              std::to_string(time(nullptr));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
+
+    const auto numMessages = 100;
+    MessageId seekMessageId;
+
+    int r = (rand() % (numMessages - 1));
+    for (int i = 0; i < numMessages; i++) {
+        MessageId id;
+        ASSERT_EQ(ResultOk,
+                  producer.send(MessageBuilder().setContent("msg-" + 
std::to_string(i)).build(), id));
+
+        if (i == r) {
+            seekMessageId = id;
+        }
+    }
+
+    LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r);
+
+    Consumer consumerExclusive;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive));
+    consumerExclusive.seek(seekMessageId);
+    Message msg0;
+    ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000));
+
+    Consumer consumerInclusive;
+    ASSERT_EQ(ResultOk,
+              client.subscribe(topic, "sub-1", 
ConsumerConfiguration().setStartMessageIdInclusive(true),
+                               consumerInclusive));
+    consumerInclusive.seek(seekMessageId);
+    Message msg1;
+    ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000));
+
+    LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from 
" << msg0.getMessageId());
+    LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from 
" << msg1.getMessageId());
+
+    ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1));
+    ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r));
+
+    consumerInclusive.close();
+    consumerExclusive.close();
+    producer.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, 
false));
+
 }  // namespace pulsar

Reply via email to