zhanglistar opened a new pull request, #546:
URL: https://github.com/apache/pulsar-client-cpp/pull/546

   Fixes [#531](https://github.com/apache/pulsar-client-cpp/issues/531)
   
   ### Problem
   
   When sending messages with `sendAsync` and then `flush()`, 
`getLastSequenceId()` returns a wrong value when batching is enabled.
   
   - After sending 3 messages (sequence ids [0, 1, 2]), the result is **4** 
instead of **2**.
   - After sending 5 messages, the result is **8** instead of **4**.
   
   The sequence id is effectively doubled per message in the batch.
   
   ### Root cause
   
   1. **Batch metadata**  
      `Commands::serializeSingleMessagesToBatchPayload` used the **last** 
message's `sequence_id` (`messages.back()`) for the batch metadata. The client 
then assumed the broker ack referred to the **first** id and computed 
`lastSequenceIdPublished_ = sequenceId + messagesCount - 1`, which is wrong 
when the broker echoes or uses the last id.
   
   2. **Ack handling**  
      In `ProducerImpl::ackReceived`, the code assumed the broker always sends 
the **first** sequence id of the batch and set `lastSequenceIdPublished_ = 
sequenceId + messagesCount - 1`. When the broker sends the **last** sequence id 
(common for batch acks), this formula double-counts and yields the wrong last 
sequence id.
   
   ### Solution
   
   1. **lib/Commands.cc**  
      In `serializeSingleMessagesToBatchPayload`, return the **first** 
message's `sequence_id` (`messages.front()`) for the batch metadata so the 
batch is identified by the first message's id.
   
   2. **lib/ProducerImpl.cc**  
      In `ackReceived`:
      - Treat the ack as matching the pending op when `sequenceId` is in 
`[expectedFirstSequenceId, expectedLastSequenceId]`.
      - If the broker sent the **first** id: `lastSequenceIdPublished_ = 
expectedLastSequenceId`.
      - If the broker sent the **last** id: `lastSequenceIdPublished_ = 
sequenceId`.
   
   3. **tests/ProducerTest.cc**  
      Add `testGetLastSequenceIdAfterBatchFlush` to verify:
      - After 3 messages in a batch and `flush()`, `getLastSequenceId()` is 2.
      - After 5 messages total, `getLastSequenceId()` is 4.
   
   ### How to verify
   
   ./tests/pulsar_tests 
--gtest_filter="ProducerTest.testGetLastSequenceIdAfterBatchFlush"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to