geniusjoe opened a new pull request, #587:
URL: https://github.com/apache/pulsar-client-cpp/pull/587

   Master Issue: https://github.com/apache/pulsar/pull/13627
   
   Related Issue: apache/pulsar#21070 and apache/pulsar#21101
   
   ### Motivation
   
   apache/pulsar#21070 and apache/pulsar#21101 fixed two critical issues in the 
Java client's chunked message 
   handling:
   
   1. **Unable to reassemble chunked messages after redeliver**: When a chunked 
message is redelivered 
      (e.g., due to broker unload or reconnect), the consumer receives 
duplicated chunks. The old 
      code could not handle this correctly:
      - For duplicated first chunk (chunkId=0): the old context was not 
properly cleaned up and restarted, 
        causing the message to never be assembled.
      - For duplicated middle chunks: the chunk would be rejected (since 
chunkId ≤ lastChunkedMessageId), 
        and the old code would discard the context entirely, making the message 
unrecoverable.
   
   2. **Ack holes caused by corrupted or orphaned chunks**: When a different 
producer reuses the same uuid 
      (corrupted chunk scenario), or when chunk context is discarded due to 
gap/expiration, the stale cached 
      chunks or the incoming corrupted chunks were never acknowledged. This 
causes the broker subscription 
      cursor to get stuck, leading to message backlog accumulation that never 
drains — even after all 
      logically valid messages have been consumed and acknowledged.
   
   These PRs added logic to distinguish between redeliver (same messageId) and 
corruption (different 
   messageId), allowing the consumer to correctly restart chunk assembly on 
redeliver while acking stale 
   chunks on corruption to prevent ack holes.
   
   The C++ client had the same issues. This PR ports the equivalent logic to 
ensure consistent behavior 
   across all client implementations.
   
   **Note**: Currently, after a chunked message is assembled, the ackTimeout 
and nack logic only tracks/handles 
   the last chunk message (i.e., the final messageId of the assembled message). 
This means if ackTimeout or 
   nack triggers a redeliver, only the last chunk entry is redelivered rather 
than all chunk entries. This 
   limitation needs to be addressed in a follow-up PR.
   
   ### Modifications
   
   **Core logic changes in `ConsumerImpl.cc` (`processMessageChunk`)**:
   
   - **Part 1 (chunkId == 0)**: When receiving a duplicated first chunk for a 
uuid that already has an 
     incomplete context, detect whether it's a redeliver (same messageId in 
cache) or corruption (different 
     messageId). For redeliver: remove old context and restart assembling. For 
corruption: ack all cached 
     chunks to avoid ack holes, then restart.
   
   - **Part 3 (duplicated middle chunk)**: When receiving a chunk with chunkId 
≤ lastChunkedMessageId, 
     detect whether it's a redeliver or corruption. For redeliver: simply 
discard the duplicate and continue 
     waiting for the next expected chunk. For corruption: ack the corrupted 
chunk to avoid ack holes.
   
   - **Part 3 (gap chunk)**: When receiving a chunk that skips expected 
sequence numbers, ack the chunk if 
     it has expired to avoid ack holes.
   
   - Added `LOG_WARN` and `LOG_INFO` for observability across all scenarios.
   
   - Added detailed comments explaining each part of the chunk processing logic 
with examples.
   
   **Test changes in `MessageChunkingTest.cc`**:
   
   - Added `testResendChunkMessagesWithoutAckHole`: Verifies that resending the 
first chunk (chunkId=0) 
     allows correct reassembly without ack holes.
   
   - Added `testResendChunkMessages`: Verifies interleaved chunk resends across 
multiple uuids assemble 
     correctly.
   
   - Added `testResendChunkWithAckHoleMessages`: Verifies duplicated middle 
chunks are filtered correctly 
     and chunk gaps cause context cleanup.
   
   - Refactored existing tests to reuse the `sendSingleChunk` helper function 
for better readability.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added `testResendChunkMessagesWithoutAckHole` to verify correct reassembly 
when first chunk is resent, 
     checking broker backlog drops to 0 after ack.
   - Added `testResendChunkMessages` to verify correct assembly with 
interleaved resends across multiple uuids.
   - Added `testResendChunkWithAckHoleMessages` to verify duplicated middle 
chunks are filtered and gap 
     chunks cause context cleanup.
   
   ### Documentation
   
   - [x] `doc-not-needed` 
   (This is a bug fix for chunk message handling internals with no user-facing 
API changes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to