geniusjoe opened a new pull request, #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464

   Master Issue: https://github.com/apache/pulsar-client-go/issues/1446
   related issue https://github.com/apache/pulsar/pull/21070 and 
https://github.com/apache/pulsar/pull/21101
   
   ### Motivation
   Current, when the producer resend the chunked message like this:
   ```
   M1: UUID: 0, ChunkID: 0
   M2: UUID: 0, ChunkID: 0 // Resend the first chunk
   M3: UUID: 0, ChunkID: 1
   ```
   When the consumer received the M2, it will find that it's already tracking 
the UUID:0 chunked messages, and will then discard the message M1 and M2. This 
will lead to unable to consume the whole chunked message even though it's 
already persisted in the Pulsar topic.
   
   Here is the code logic:
   ```Go
        if ctx == nil || ctx.chunkedMsgBuffer == nil || chunkID != 
ctx.lastChunkedMsgID+1 {
                lastChunkedMsgID := -1
                totalChunks := -1
                if ctx != nil {
                        lastChunkedMsgID = int(ctx.lastChunkedMsgID)
                        totalChunks = int(ctx.totalChunks)
                        ctx.chunkedMsgBuffer.Clear()
                }
                pc.log.Warnf(fmt.Sprintf(
                        "Received unexpected chunk messageId %s, last-chunk-id 
%d, chunkId = %d, total-chunks %d",
                        msgID.String(), lastChunkedMsgID, chunkID, totalChunks))
                pc.chunkedMsgCtxMap.remove(uuid)
                pc.availablePermits.inc()
                return nil
        }
   ```
   The bug can be easily reproduced using the testcase 
`TestChunkWithReconnection` and `TestResendChunkMessages` introduced by this PR.
   
   ### Modifications
   The current chunk processing strategy is consistent with the behavior of the 
Java client:
   
https://github.com/apache/pulsar/blob/52a4d5ee84fad6af2736376a6fcdd1bc41e7c52f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1579
   
   When receiving the new duplicated first chunk of a chunked message, the 
consumer discard the current chunked message context and create a new context 
to track the following messages. For the case mentioned in Motivation, the M1 
will be released and the consumer will assemble M2 and M3 as the chunked 
message.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
   `TestChunkWithReconnection`
   `TestResendChunkMessages`
   `TestResendChunkWithAckHoleMessages`
   
   ### Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
   
   ### Documentation
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to