This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 23b60d1  Fix incorrect last sequence id when sending messages in batch 
(#546)
23b60d1 is described below

commit 23b60d1fe264fd48775fae7c7ebabae355d3c2a9
Author: zhanglistar <[email protected]>
AuthorDate: Thu Mar 5 22:29:13 2026 +0800

    Fix incorrect last sequence id when sending messages in batch (#546)
---
 lib/Commands.cc               |  4 +++-
 lib/ProducerImpl.cc           | 24 ++++++++++++++----------
 tests/KeyBasedBatchingTest.cc |  9 +++++----
 tests/ProducerTest.cc         | 38 ++++++++++++++++++++++++++++++++++++++
 tests/ReaderTest.cc           | 15 +++++++++++++--
 5 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/lib/Commands.cc b/lib/Commands.cc
index 30f5bf1..08dc718 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -902,7 +902,9 @@ uint64_t 
Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
         batchPayload.write(payload.data(), payload.readableBytes());
     }
 
-    return messages.back().impl_->metadata.sequence_id();
+    // Use the first message's sequence_id so that ackReceived can compute
+    // lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
+    return messages.front().impl_->metadata.sequence_id();
 }
 
 Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, 
int32_t batchIndex,
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 360e128..c9a16e8 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -933,19 +933,24 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
         return false;
     }
 
-    uint64_t expectedSequenceId = op.sendArgs->sequenceId;
-    if (sequenceId > expectedSequenceId) {
-        LOG_WARN(getName() << "Got ack for msg " << sequenceId                 
       //
-                           << " expecting: " << expectedSequenceId << " queue 
size="  //
-                           << pendingMessagesQueue_.size() << " producer: " << 
producerId_);
+    const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
+    const uint64_t expectedLastSequenceId = expectedFirstSequenceId + 
op.messagesCount - 1;
+    // Broker may ack with either the first or the last sequence id of the 
batch.
+    if (sequenceId > expectedLastSequenceId) {
+        LOG_WARN(getName() << "Got ack for msg " << sequenceId
+                           << " expecting last: " << expectedLastSequenceId
+                           << " queue size=" << pendingMessagesQueue_.size() 
<< " producer: " << producerId_);
         return false;
-    } else if (sequenceId < expectedSequenceId) {
+    }
+    if (sequenceId < expectedFirstSequenceId) {
         // Ignoring the ack since it's referring to a message that has already 
timed out.
-        LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId  //
-                            << " -- MessageId - " << messageId << " last-seq: 
" << expectedSequenceId
-                            << " producer: " << producerId_);
+        LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " 
-- MessageId - " << messageId
+                            << " first-seq: " << expectedFirstSequenceId << " 
producer: " << producerId_);
         return true;
     }
+    // sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; 
accept as matching this op.
+    const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
+    lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : 
sequenceId;
 
     // Message was persisted correctly
     LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
@@ -960,7 +965,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
     }
 
     releaseSemaphoreForSendOp(op);
-    lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
 
     std::unique_ptr<OpSendMsg> 
opSendMsg{pendingMessagesQueue_.front().release()};
     pendingMessagesQueue_.pop_front();
diff --git a/tests/KeyBasedBatchingTest.cc b/tests/KeyBasedBatchingTest.cc
index e596266..d5c5ce7 100644
--- a/tests/KeyBasedBatchingTest.cc
+++ b/tests/KeyBasedBatchingTest.cc
@@ -134,10 +134,11 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
     sendAsync("B", "3");
     sendAsync("C", "4");
     sendAsync("A", "5");
-    // sequence id: B < C < A, so there are 3 batches in order as following:
+    // Batches are sent in ascending order of the first message's sequence id 
(BatchMessageKeyBasedContainer
+    // sorts by sendArgs->sequenceId). Send order gives A=0, B=1, C=2 for 
first per key, so batches: A, B, C.
+    //   A: 0, 5
     //   B: 1, 3
     //   C: 2, 4
-    //   A: 0, 5
     latch.wait();
 
     std::vector<std::string> receivedKeys;
@@ -149,8 +150,8 @@ TEST_F(KeyBasedBatchingTest, testSequenceId) {
         receivedValues.emplace_back(msg.getDataAsString());
     }
 
-    decltype(receivedKeys) expectedKeys{"B", "B", "C", "C", "A", "A"};
-    decltype(receivedValues) expectedValues{"1", "3", "2", "4", "0", "5"};
+    decltype(receivedKeys) expectedKeys{"A", "A", "B", "B", "C", "C"};
+    decltype(receivedValues) expectedValues{"0", "5", "1", "3", "2", "4"};
     EXPECT_EQ(receivedKeys, expectedKeys);
     EXPECT_EQ(receivedValues, expectedValues);
 }
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index edb79e4..4220a2e 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) {
     client.close();
 }
 
+// Verifies that getLastSequenceId() is correct after sendAsync + flush when 
batching is enabled.
+// Previously the batch used the last message's sequence_id, causing 
lastSequenceIdPublished_ to be
+// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the 
first message's
+// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 
1 is correct.
+TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) {
+    Client client(serviceUrl);
+
+    const std::string topicName =
+        "persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + 
std::to_string(time(nullptr));
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(true);
+    producerConfiguration.setBatchingMaxMessages(10);
+    producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, 
producerConfiguration, producer));
+
+    // Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so 
getLastSequenceId() must be 2.
+    for (int i = 0; i < 3; i++) {
+        Message msg = MessageBuilder().setContent("content").build();
+        producer.sendAsync(msg, nullptr);
+    }
+    ASSERT_EQ(ResultOk, producer.flush());
+    ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last 
sequence id should be 2";
+
+    // Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so 
getLastSequenceId() must be 4.
+    for (int i = 0; i < 2; i++) {
+        Message msg = MessageBuilder().setContent("content").build();
+        producer.sendAsync(msg, nullptr);
+    }
+    ASSERT_EQ(ResultOk, producer.flush());
+    ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, 
last sequence id should be 4";
+
+    producer.close();
+    client.close();
+}
+
 TEST_P(ProducerTest, testFlushBatch) {
     Client client(serviceUrl);
 
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index e4a924d..77719a1 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -22,6 +22,7 @@
 #include <time.h>
 
 #include <atomic>
+#include <chrono>
 #include <functional>
 #include <future>
 #include <set>
@@ -863,7 +864,13 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekToEnd) {
     }
 
     ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
-    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    // After seek-to-end the broker may close the consumer and trigger 
reconnect; allow a short
+    // delay for hasMessageAvailable to become false (avoids flakiness when 
reconnect completes).
+    for (int i = 0; i < 50; i++) {
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+        if (!hasMessageAvailable) break;
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
     ASSERT_FALSE(hasMessageAvailable);
 
     producer.send(MessageBuilder().setContent("msg-2").build());
@@ -876,7 +883,11 @@ TEST_P(ReaderSeekTest, 
testHasMessageAvailableAfterSeekToEnd) {
 
     // Test the 2nd seek
     ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
-    ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+    for (int i = 0; i < 50; i++) {
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
+        if (!hasMessageAvailable) break;
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
     ASSERT_FALSE(hasMessageAvailable);
 }
 

Reply via email to