This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new aeff955 Fix startMessageInclusive does not work if the 1st message is
a chunked message (#462)
aeff955 is described below
commit aeff955f2a5e3d87532b606bab02f4067b795bd5
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Dec 12 18:30:31 2024 +0800
Fix startMessageInclusive does not work if the 1st message is a chunked
message (#462)
---
lib/ConsumerImpl.cc | 27 +++++++++++++++++++++++++--
tests/ReaderTest.cc | 29 +++++++++++++++++++++++++++++
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 289bd34..d540849 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -59,6 +59,21 @@ DECLARE_LOG_OBJECT()
using std::chrono::milliseconds;
using std::chrono::seconds;
+static boost::optional<MessageId> getStartMessageId(const
boost::optional<MessageId>& startMessageId,
+ bool inclusive) {
+ if (!inclusive || !startMessageId) {
+ return startMessageId;
+ }
+ // The default ledger id and entry id of a chunked message refer the
fields of the last chunk. When the
+ // start message id is inclusive, we need to start from the first chunk.
+ auto chunkMsgIdImpl =
+ dynamic_cast<const
ChunkMessageIdImpl*>(Commands::getMessageIdImpl(startMessageId.value()).get());
+ if (chunkMsgIdImpl) {
+ return
boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
+ }
+ return startMessageId;
+}
+
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string&
topic,
const std::string& subscriptionName, const
ConsumerConfiguration& conf,
bool isPersistent, const ConsumerInterceptorsPtr&
interceptors,
@@ -91,7 +106,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
messageListenerRunning_(!conf.isStartPaused()),
negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client,
*this, conf)),
readCompacted_(conf.isReadCompacted()),
- startMessageId_(startMessageId),
+ startMessageId_(getStartMessageId(startMessageId,
conf.isStartMessageIdInclusive())),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -469,7 +484,15 @@ boost::optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuff
auto& chunkedMsgCtx = it->second;
if (it == chunkedMessageCache_.end() ||
!chunkedMsgCtx.validateChunkId(chunkId)) {
- if (it == chunkedMessageCache_.end()) {
+ auto startMessageId =
startMessageId_.get().value_or(MessageId::earliest());
+ if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId()
== messageId.ledgerId() &&
+ startMessageId.entryId() == messageId.entryId()) {
+ // When the start message id is not inclusive, the last chunk of
the previous chunked message will
+ // be delivered, which is expected and we only need to filter it
out.
+ chunkedMessageCache_.remove(uuid);
+ LOG_INFO("Filtered the chunked message before the start message id
(uuid: "
+ << uuid << " chunkId: " << chunkId << ", messageId: " <<
messageId << ")");
+ } else if (it == chunkedMessageCache_.end()) {
LOG_ERROR("Received an uncached chunk (uuid: " << uuid << "
chunkId: " << chunkId
<< ", messageId: "
<< messageId << ")");
} else {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 92fdf62..ad81949 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -888,5 +888,34 @@ TEST_P(ReaderSeekTest,
testHasMessageAvailableAfterSeekTimestamp) {
}
}
+TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
+ const auto topic = "test-seek-inclusive-chunk-message-" +
std::to_string(time(nullptr));
+
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setBatchingEnabled(false);
+ producerConf.setChunkingEnabled(true);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+
+ std::string largeValue(1024 * 1024 * 6, 'a');
+ MessageId firstMsgId;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(largeValue).build(), firstMsgId));
+ MessageId secondMsgId;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(largeValue).build(), secondMsgId));
+
+ auto assertStartMessageId = [&](bool inclusive, MessageId expectedMsgId) {
+ Reader reader;
+ ReaderConfiguration readerConf;
+ readerConf.setStartMessageIdInclusive(inclusive);
+ ASSERT_EQ(ResultOk, client.createReader(topic, firstMsgId, readerConf,
reader));
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+ ASSERT_EQ(expectedMsgId, msg.getMessageId());
+ ASSERT_EQ(ResultOk, reader.close());
+ };
+ assertStartMessageId(true, firstMsgId);
+ assertStartMessageId(false, secondMsgId);
+}
+
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true,
false));