This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new aeff955  Fix startMessageInclusive does not work if the 1st message is 
a chunked message (#462)
aeff955 is described below

commit aeff955f2a5e3d87532b606bab02f4067b795bd5
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Dec 12 18:30:31 2024 +0800

    Fix startMessageInclusive does not work if the 1st message is a chunked 
message (#462)
---
 lib/ConsumerImpl.cc | 27 +++++++++++++++++++++++++--
 tests/ReaderTest.cc | 29 +++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 289bd34..d540849 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -59,6 +59,21 @@ DECLARE_LOG_OBJECT()
 using std::chrono::milliseconds;
 using std::chrono::seconds;
 
+static boost::optional<MessageId> getStartMessageId(const 
boost::optional<MessageId>& startMessageId,
+                                                    bool inclusive) {
+    if (!inclusive || !startMessageId) {
+        return startMessageId;
+    }
+    // The default ledger id and entry id of a chunked message refer the 
fields of the last chunk. When the
+    // start message id is inclusive, we need to start from the first chunk.
+    auto chunkMsgIdImpl =
+        dynamic_cast<const 
ChunkMessageIdImpl*>(Commands::getMessageIdImpl(startMessageId.value()).get());
+    if (chunkMsgIdImpl) {
+        return 
boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds().front()};
+    }
+    return startMessageId;
+}
+
 ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& 
topic,
                            const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
                            bool isPersistent, const ConsumerInterceptorsPtr& 
interceptors,
@@ -91,7 +106,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       messageListenerRunning_(!conf.isStartPaused()),
       negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, 
*this, conf)),
       readCompacted_(conf.isReadCompacted()),
-      startMessageId_(startMessageId),
+      startMessageId_(getStartMessageId(startMessageId, 
conf.isStartMessageIdInclusive())),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
       
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
       
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -469,7 +484,15 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
 
     auto& chunkedMsgCtx = it->second;
     if (it == chunkedMessageCache_.end() || 
!chunkedMsgCtx.validateChunkId(chunkId)) {
-        if (it == chunkedMessageCache_.end()) {
+        auto startMessageId = 
startMessageId_.get().value_or(MessageId::earliest());
+        if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() 
== messageId.ledgerId() &&
+            startMessageId.entryId() == messageId.entryId()) {
+            // When the start message id is not inclusive, the last chunk of 
the previous chunked message will
+            // be delivered, which is expected and we only need to filter it 
out.
+            chunkedMessageCache_.remove(uuid);
+            LOG_INFO("Filtered the chunked message before the start message id 
(uuid: "
+                     << uuid << " chunkId: " << chunkId << ", messageId: " << 
messageId << ")");
+        } else if (it == chunkedMessageCache_.end()) {
             LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " 
chunkId: " << chunkId
                                                            << ", messageId: " 
<< messageId << ")");
         } else {
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 92fdf62..ad81949 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -888,5 +888,34 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekTimestamp) {
     }
 }
 
+TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
+    const auto topic = "test-seek-inclusive-chunk-message-" + 
std::to_string(time(nullptr));
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setBatchingEnabled(false);
+    producerConf.setChunkingEnabled(true);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+
+    std::string largeValue(1024 * 1024 * 6, 'a');
+    MessageId firstMsgId;
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(largeValue).build(), firstMsgId));
+    MessageId secondMsgId;
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(largeValue).build(), secondMsgId));
+
+    auto assertStartMessageId = [&](bool inclusive, MessageId expectedMsgId) {
+        Reader reader;
+        ReaderConfiguration readerConf;
+        readerConf.setStartMessageIdInclusive(inclusive);
+        ASSERT_EQ(ResultOk, client.createReader(topic, firstMsgId, readerConf, 
reader));
+        Message msg;
+        ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
+        ASSERT_EQ(expectedMsgId, msg.getMessageId());
+        ASSERT_EQ(ResultOk, reader.close());
+    };
+    assertStartMessageId(true, firstMsgId);
+    assertStartMessageId(false, secondMsgId);
+}
+
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, 
false));

Reply via email to