This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e4c74671f0 [feature][client-cpp] Support inclusive seek for cpp
client (#17209)
7e4c74671f0 is described below
commit 7e4c74671f0f13f0a5d860d19655f09c0a351f58
Author: Kai Wang <[email protected]>
AuthorDate: Wed Sep 28 10:08:24 2022 +0800
[feature][client-cpp] Support inclusive seek for cpp client (#17209)
Fixes #17186
### Motivation
There are some cases in which it is useful to be able to include current
position of the message when reset of cursor was made.
### Modifications
* Support inclusive seek in c++ consumers.
* Add a unit test to verify.
---
.../include/pulsar/ConsumerConfiguration.h | 14 ++
pulsar-client-cpp/lib/ClientImpl.cc | 4 +-
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 7 +
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ConsumerImpl.cc | 165 +++++++++++++--------
pulsar-client-cpp/lib/ConsumerImpl.h | 16 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 9 +-
pulsar-client-cpp/lib/ReaderImpl.cc | 7 +-
pulsar-client-cpp/lib/Synchronized.h | 42 ++++++
pulsar-client-cpp/python/pulsar_test.py | 8 +-
pulsar-client-cpp/tests/ConsumerTest.cc | 63 ++++++++
11 files changed, 257 insertions(+), 79 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index b326ca8fb31..4347c3b2d5f 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -499,6 +499,20 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
bool isAutoAckOldestChunkedMessageOnQueueFull() const;
+ /**
+ * Set the consumer to include the given position of any reset operation
like Consumer::seek.
+ *
+ * Default: false
+ *
+ * @param startMessageIdInclusive whether to include the reset position
+ */
+ ConsumerConfiguration& setStartMessageIdInclusive(bool
startMessageIdInclusive);
+
+ /**
+ * The associated getter of setStartMessageIdInclusive
+ */
+ bool isStartMessageIdInclusive() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index 08adb1d6423..29e92f3b815 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -378,8 +378,8 @@ void ClientImpl::handleSubscribe(const Result result, const
LookupDataResultPtr
partitionMetadata->getPartitions(),
subscriptionName, conf, lookupServicePtr_);
} else {
- auto consumerImpl =
std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
-
subscriptionName, conf);
+ auto consumerImpl = std::make_shared<ConsumerImpl>(
+ shared_from_this(), topicName->toString(), subscriptionName,
conf, topicName->isPersistent());
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
consumer = consumerImpl;
}
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 2b58835cdbe..f9fe499b954 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -260,4 +260,11 @@ bool
ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
return impl_->autoAckOldestChunkedMessageOnQueueFull;
}
+ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool
startMessageIdInclusive) {
+ impl_->startMessageIdInclusive = startMessageIdInclusive;
+ return *this;
+}
+
+bool ConsumerConfiguration::isStartMessageIdInclusive() const { return
impl_->startMessageIdInclusive; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 1c13f729b55..cca83a38829 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -53,6 +53,7 @@ struct ConsumerConfigurationImpl {
KeySharedPolicy keySharedPolicy;
size_t maxPendingChunkedMessage{10};
bool autoAckOldestChunkedMessageOnQueueFull{false};
+ bool startMessageIdInclusive{false};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 79c20d84649..37fcd95248a 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string&
topic,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
+ bool isPersistent,
const ExecutorServicePtr listenerExecutor /* = NULL
by default */,
bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* =
NonPartitioned by default */,
@@ -47,6 +48,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
config_(conf),
subscription_(subscriptionName),
originalSubscriptionName_(subscriptionName),
+ isPersistent_(isPersistent),
messageListener_(config_.getMessageListener()),
eventListener_(config_.getConsumerEventListener()),
hasParent_(hasParent),
@@ -169,14 +171,17 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
// sending the subscribe request.
cnx->registerConsumer(consumerId_, shared_from_this());
- Lock lockForMessageId(mutexForMessageId_);
- Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
- if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
- // Update startMessageId so that we can discard messages after delivery
- // restarts
- startMessageId_ = firstMessageInQueue;
+ if (duringSeek_) {
+ ackGroupingTrackerPtr_->flushAndClean();
}
- const auto startMessageId = startMessageId_;
+
+ Lock lockForMessageId(mutexForMessageId_);
+ // Update startMessageId so that we can discard messages after delivery
restarts
+ const auto startMessageId = clearReceiveQueue();
+ const auto subscribeMessageId = (subscriptionMode_ ==
Commands::SubscriptionModeNonDurable)
+ ? startMessageId
+ : Optional<MessageId>::empty();
+ startMessageId_ = startMessageId;
lockForMessageId.unlock();
unAckedMessageTrackerPtr_->clear();
@@ -186,7 +191,7 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(),
consumerName_, subscriptionMode_,
- startMessageId, readCompacted_, config_.getProperties(),
config_.getSubscriptionProperties(),
+ subscribeMessageId, readCompacted_, config_.getProperties(),
config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(),
config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
cnx->sendRequestWithId(cmd, requestId)
@@ -397,12 +402,12 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
return;
}
- const bool isMessageDecryptable =
- metadata.encryption_keys_size() <= 0 ||
config_.getCryptoKeyReader().get() ||
+ const bool isMessageUndecryptable =
+ metadata.encryption_keys_size() > 0 &&
!config_.getCryptoKeyReader().get() &&
config_.getCryptoFailureAction() ==
ConsumerCryptoFailureAction::CONSUME;
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
- if (isMessageDecryptable && !isChunkedMessage) {
+ if (!isMessageUndecryptable && !isChunkedMessage) {
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata,
payload, true)) {
// Message was discarded on decompression error
return;
@@ -446,6 +451,16 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m,
msg.redelivery_count());
} else {
+ const auto startMessageId = startMessageId_.get();
+ if (isPersistent_ && startMessageId.is_present() &&
+ m.getMessageId().ledgerId() == startMessageId.value().ledgerId() &&
+ m.getMessageId().entryId() == startMessageId.value().entryId() &&
+ isPriorEntryIndex(m.getMessageId().entryId())) {
+ LOG_DEBUG(getName() << " Ignoring message from before the
startMessageId: "
+ << startMessageId.value());
+ return;
+ }
+
Lock lock(pendingReceiveMutex_);
// if asyncReceive is waiting then notify callback without adding to
incomingMessages queue
bool asyncReceivedWaiting = !pendingReceives_.empty();
@@ -533,9 +548,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
batchAcknowledgementTracker_.receivedMessage(batchedMessage);
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " <<
batchedMessage.getMessageId());
- Lock lock(mutexForMessageId_);
- const auto startMessageId = startMessageId_;
- lock.unlock();
+ const auto startMessageId = startMessageId_.get();
int skippedMessages = 0;
@@ -550,9 +563,9 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
// If we are receiving a batch message, we need to discard
messages that were prior
// to the startMessageId
- if (msgId.ledgerId() == startMessageId.value().ledgerId() &&
+ if (isPersistent_ && msgId.ledgerId() ==
startMessageId.value().ledgerId() &&
msgId.entryId() == startMessageId.value().entryId() &&
- msgId.batchIndex() <= startMessageId.value().batchIndex()) {
+ isPriorBatchIndex(msgId.batchIndex())) {
LOG_DEBUG(getName() << "Ignoring message from before the
startMessageId"
<< msg.getMessageId());
++skippedMessages;
@@ -842,6 +855,12 @@ void ConsumerImpl::messageProcessed(Message& msg, bool
track) {
* not seen by the application
*/
Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
+ bool expectedDuringSeek = true;
+ if (duringSeek_.compare_exchange_strong(expectedDuringSeek, false)) {
+ return Optional<MessageId>::of(seekMessageId_.get());
+ } else if (subscriptionMode_ == Commands::SubscriptionModeDurable) {
+ return startMessageId_.get();
+ }
Message nextMessageInQueue;
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
// There was at least one message pending in the queue
@@ -862,7 +881,7 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
} else {
// No message was received or dequeued by this consumer. Next message
would still be the
// startMessageId
- return startMessageId_;
+ return startMessageId_.get();
}
}
@@ -1175,18 +1194,6 @@ void ConsumerImpl::brokerConsumerStatsListener(Result
res, BrokerConsumerStatsIm
}
}
-void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
- if (result == ResultOk) {
- Lock lock(mutexForMessageId_);
- lastDequedMessageId_ = MessageId::earliest();
- lock.unlock();
- LOG_INFO(getName() << "Seek successfully");
- } else {
- LOG_ERROR(getName() << "Failed to seek: " << strResult(result));
- }
- callback(result);
-}
-
void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
const auto state = state_.load();
if (state == Closed || state == Closing) {
@@ -1197,25 +1204,13 @@ void ConsumerImpl::seekAsync(const MessageId& msgId,
ResultCallback callback) {
return;
}
- this->ackGroupingTrackerPtr_->flushAndClean();
- ClientConnectionPtr cnx = getCnx().lock();
- if (cnx) {
- ClientImplPtr client = client_.lock();
- uint64_t requestId = client->newRequestId();
- LOG_DEBUG(getName() << " Sending seek Command for Consumer - " <<
getConsumerId() << ", requestId - "
- << requestId);
- Future<Result, ResponseData> future =
- cnx->sendRequestWithId(Commands::newSeek(consumerId_, requestId,
msgId), requestId);
-
- if (callback) {
- future.addListener(
- std::bind(&ConsumerImpl::handleSeek, shared_from_this(),
std::placeholders::_1, callback));
- }
+ ClientImplPtr client = client_.lock();
+ if (!client) {
+ LOG_ERROR(getName() << "Client is expired when seekAsync " << msgId);
return;
}
-
- LOG_ERROR(getName() << " Client Connection not ready for Consumer");
- callback(ResultNotConnected);
+ const auto requestId = client->newRequestId();
+ seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId,
msgId), msgId, 0L, callback);
}
void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1228,24 +1223,14 @@ void ConsumerImpl::seekAsync(uint64_t timestamp,
ResultCallback callback) {
return;
}
- ClientConnectionPtr cnx = getCnx().lock();
- if (cnx) {
- ClientImplPtr client = client_.lock();
- uint64_t requestId = client->newRequestId();
- LOG_DEBUG(getName() << " Sending seek Command for Consumer - " <<
getConsumerId() << ", requestId - "
- << requestId);
- Future<Result, ResponseData> future =
- cnx->sendRequestWithId(Commands::newSeek(consumerId_, requestId,
timestamp), requestId);
-
- if (callback) {
- future.addListener(
- std::bind(&ConsumerImpl::handleSeek, shared_from_this(),
std::placeholders::_1, callback));
- }
+ ClientImplPtr client = client_.lock();
+ if (!client) {
+ LOG_ERROR(getName() << "Client is expired when seekAsync " <<
timestamp);
return;
}
-
- LOG_ERROR(getName() << " Client Connection not ready for Consumer");
- callback(ResultNotConnected);
+ const auto requestId = client->newRequestId();
+ seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId,
timestamp), MessageId::earliest(),
+ timestamp, callback);
}
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1255,9 +1240,10 @@ inline bool hasMoreMessages(const MessageId&
lastMessageIdInBroker, const Messag
}
void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback
callback) {
+ const auto startMessageId = startMessageId_.get();
Lock lock(mutexForMessageId_);
const auto messageId =
- (lastDequedMessageId_ == MessageId::earliest()) ?
startMessageId_.value() : lastDequedMessageId_;
+ (lastDequedMessageId_ == MessageId::earliest()) ?
startMessageId.value() : lastDequedMessageId_;
if (messageId == MessageId::latest()) {
lock.unlock();
@@ -1380,4 +1366,57 @@ bool ConsumerImpl::isConnected() const { return
!getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ?
1 : 0; }
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const
MessageId& seekId,
+ long timestamp, ResultCallback callback) {
+ ClientConnectionPtr cnx = getCnx().lock();
+ if (!cnx) {
+ LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+ callback(ResultNotConnected);
+ return;
+ }
+
+ const auto originalSeekMessageId = seekMessageId_.get();
+ seekMessageId_ = seekId;
+ duringSeek_ = true;
+ if (timestamp > 0) {
+ LOG_INFO(getName() << " Seeking subscription to " << timestamp);
+ } else {
+ LOG_INFO(getName() << " Seeking subscription to " << seekId);
+ }
+
+ std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+
+ cnx->sendRequestWithId(seek, requestId)
+ .addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
+ const
ResponseData& responseData) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ callback(result);
+ return;
+ }
+ if (result == ResultOk) {
+ LOG_INFO(getName() << "Seek successfully");
+ ackGroupingTrackerPtr_->flushAndClean();
+ Lock lock(mutexForMessageId_);
+ lastDequedMessageId_ = MessageId::earliest();
+ lock.unlock();
+ } else {
+ LOG_ERROR(getName() << "Failed to seek: " << result);
+ seekMessageId_ = originalSeekMessageId;
+ duringSeek_ = false;
+ }
+ callback(result);
+ });
+}
+
+bool ConsumerImpl::isPriorBatchIndex(int32_t idx) {
+ return config_.isStartMessageIdInclusive() ? idx <
startMessageId_.get().value().batchIndex()
+ : idx <=
startMessageId_.get().value().batchIndex();
+}
+
+bool ConsumerImpl::isPriorEntryIndex(int64_t idx) {
+ return config_.isStartMessageIdInclusive() ? idx <
startMessageId_.get().value().entryId()
+ : idx <=
startMessageId_.get().value().entryId();
+}
+
} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index 70fda0170cc..1ad3a4c3727 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -46,6 +46,7 @@
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
#include <atomic>
+#include "Synchronized.h"
using namespace pulsar;
@@ -69,7 +70,7 @@ class ConsumerImpl : public ConsumerImplBase,
public std::enable_shared_from_this<ConsumerImpl> {
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const
std::string& subscriptionName,
- const ConsumerConfiguration&,
+ const ConsumerConfiguration&, bool isPersistent,
const ExecutorServicePtr listenerExecutor =
ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
@@ -138,7 +139,6 @@ class ConsumerImpl : public ConsumerImplBase,
virtual void redeliverMessages(const std::set<MessageId>& messageIds);
- void handleSeek(Result result, ResultCallback callback);
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback
callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback
callback);
@@ -169,6 +169,8 @@ class ConsumerImpl : public ConsumerImplBase,
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr&
cnx, Message& batchedMessage,
int redeliveryCount);
+ bool isPriorBatchIndex(int32_t idx);
+ bool isPriorEntryIndex(int64_t idx);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl,
BrokerConsumerStatsCallback);
bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
proto::CommandMessage& msg,
@@ -187,11 +189,14 @@ class ConsumerImpl : public ConsumerImplBase,
BrokerGetLastMessageIdCallback
callback);
Optional<MessageId> clearReceiveQueue();
+ void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId&
seekId, long timestamp,
+ ResultCallback callback);
std::mutex mutexForReceiveWithZeroQueueSize;
const ConsumerConfiguration config_;
const std::string subscription_;
std::string originalSubscriptionName_;
+ const bool isPersistent_;
MessageListener messageListener_;
ConsumerEventListenerPtr eventListener_;
ExecutorServicePtr listenerExecutor_;
@@ -220,12 +225,15 @@ class ConsumerImpl : public ConsumerImplBase,
MessageCryptoPtr msgCrypto_;
const bool readCompacted_;
- // Make the access to `startMessageId_`, `lastDequedMessageId_` and
`lastMessageIdInBroker_` thread safe
+ // Make the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`
thread safe
mutable std::mutex mutexForMessageId_;
- Optional<MessageId> startMessageId_;
MessageId lastDequedMessageId_{MessageId::earliest()};
MessageId lastMessageIdInBroker_{MessageId::earliest()};
+ std::atomic_bool duringSeek_{false};
+ Synchronized<Optional<MessageId>>
startMessageId_{Optional<MessageId>::empty()};
+ Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
+
class ChunkedMessageCtx {
public:
ChunkedMessageCtx() : totalChunks_(0) {}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 75150762345..0d730e1561f 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -183,7 +183,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int
numPartitions, TopicN
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_,
topicName->toString(), subscriptionName_, config,
- internalListenerExecutor,
true, NonPartitioned);
+ topicName->isPersistent(),
internalListenerExecutor, true,
+ NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, partitionsNeedCreate,
topicSubResultPromise));
@@ -195,7 +196,8 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int
numPartitions, TopicN
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName =
topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
-
internalListenerExecutor, true, Partitioned);
+
topicName->isPersistent(), internalListenerExecutor,
+ true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(),
std::placeholders::_1, std::placeholders::_2,
partitionsNeedCreate, topicSubResultPromise));
@@ -819,7 +821,8 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
std::string topicPartitionName =
topicName->getTopicPartitionName(partitionIndex);
auto consumer = std::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
- internalListenerExecutor,
true, Partitioned);
+ topicName->isPersistent(),
internalListenerExecutor, true,
+ Partitioned);
consumer->getConsumerCreatedFuture().addListener(
std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(),
std::placeholders::_1, std::placeholders::_2,
partitionsNeedCreate, topicSubResultPromise));
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 5f78068228f..83fa6a57009 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -76,9 +76,10 @@ void ReaderImpl::start(const MessageId& startMessageId,
test::consumerConfigOfReader = consumerConf.clone();
}
- consumer_ = std::make_shared<ConsumerImpl>(
- client_.lock(), topic_, subscription, consumerConf,
ExecutorServicePtr(), false, NonPartitioned,
- Commands::SubscriptionModeNonDurable,
Optional<MessageId>::of(startMessageId));
+ consumer_ = std::make_shared<ConsumerImpl>(client_.lock(), topic_,
subscription, consumerConf,
+
TopicName::get(topic_)->isPersistent(), ExecutorServicePtr(),
+ false, NonPartitioned,
Commands::SubscriptionModeNonDurable,
+
Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
diff --git a/pulsar-client-cpp/lib/Synchronized.h
b/pulsar-client-cpp/lib/Synchronized.h
new file mode 100644
index 00000000000..a98c08daeee
--- /dev/null
+++ b/pulsar-client-cpp/lib/Synchronized.h
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <mutex>
+
+template <typename T>
+class Synchronized {
+ public:
+ explicit Synchronized(const T& value) : value_(value) {}
+
+ T get() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return value_;
+ }
+
+ Synchronized& operator=(const T& value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ value_ = value;
+ return *this;
+ }
+
+ private:
+ T value_;
+ mutable std::mutex mutex_;
+};
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index 5f46edc4365..375afe43adb 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -546,7 +546,7 @@ class PulsarTest(TestCase):
# The reset would be effectively done on the next position relative to
reset.
# When available, we should test this behaviour with
`startMessageIdInclusive` opt.
from_msg_idx = last_msg_idx
- for i in range(from_msg_idx, num_of_msgs):
+ for i in range(from_msg_idx + 1, num_of_msgs):
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b"hello-%d" % i)
@@ -896,7 +896,7 @@ class PulsarTest(TestCase):
consumer.seek(ids[50])
time.sleep(0.5)
msg = consumer.receive(TM)
- self.assertEqual(msg.data(), b"hello-50")
+ self.assertEqual(msg.data(), b"hello-51")
# ditto, but seek on timestamp
consumer.seek(timestamps[42])
@@ -921,9 +921,9 @@ class PulsarTest(TestCase):
reader.seek(ids[33])
time.sleep(0.5)
msg = reader.read_next(TM)
- self.assertEqual(msg.data(), b"hello-33")
- msg = reader.read_next(TM)
self.assertEqual(msg.data(), b"hello-34")
+ msg = reader.read_next(TM)
+ self.assertEqual(msg.data(), b"hello-35")
# seek on timestamp
reader.seek(timestamps[79])
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index cf9ac23d190..c8a07e6c84b 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -868,4 +868,67 @@ TEST(ConsumerTest,
testGetLastMessageIdBlockWhenConnectionDisconnected) {
ASSERT_GE(elapsed.seconds(), operationTimeout);
}
+class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
+ public:
+ void SetUp() override { producerConf_ =
ProducerConfiguration().setBatchingEnabled(GetParam()); }
+
+ void TearDown() override { client_.close(); }
+
+ protected:
+ Client client_{lookupUrl};
+ ProducerConfiguration producerConf_;
+};
+
+TEST_P(ConsumerSeekTest, testSeekForMessageId) {
+ Client client(lookupUrl);
+
+ const std::string topic = "test-seek-for-message-id-" +
std::string((GetParam() ? "batch-" : "")) +
+ std::to_string(time(nullptr));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer));
+
+ const auto numMessages = 100;
+ MessageId seekMessageId;
+
+ int r = (rand() % (numMessages - 1));
+ for (int i = 0; i < numMessages; i++) {
+ MessageId id;
+ ASSERT_EQ(ResultOk,
+ producer.send(MessageBuilder().setContent("msg-" +
std::to_string(i)).build(), id));
+
+ if (i == r) {
+ seekMessageId = id;
+ }
+ }
+
+ LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r);
+
+ Consumer consumerExclusive;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive));
+ consumerExclusive.seek(seekMessageId);
+ Message msg0;
+ ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000));
+
+ Consumer consumerInclusive;
+ ASSERT_EQ(ResultOk,
+ client.subscribe(topic, "sub-1",
ConsumerConfiguration().setStartMessageIdInclusive(true),
+ consumerInclusive));
+ consumerInclusive.seek(seekMessageId);
+ Message msg1;
+ ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000));
+
+ LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from
" << msg0.getMessageId());
+ LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from
" << msg1.getMessageId());
+
+ ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1));
+ ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r));
+
+ consumerInclusive.close();
+ consumerExclusive.close();
+ producer.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true,
false));
+
} // namespace pulsar