BewareMyPower opened a new pull request #13627:
URL: https://github.com/apache/pulsar/pull/13627


   ### Motivation
   
   This PR is a C++ catch up of https://github.com/apache/pulsar/pull/4400, 
which implements [PIP 
37](https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar).
   
   ### Modifications
   
   Changes of the interfaces:
   - Add the `chunkingEnabled` config to producer so that producer can enable 
chunking messages by enabling `chunkingEnabled` and disabling `batchingEnabled`.
   - Add the `maxPendingChunkedMessage` and 
`autoOldestChunkedMessageOnQueueFull` configs to consumer to control the 
behavior to handle chunks.
   
   Implementations of producer side:
   - Refactor the `sendAsync` method to follow the similar logic of Java 
client. If `canAddToBatch` returns false and the message size exceeds the 
limit, split the large message into chunks and send them one by one.
   - Fix the bug of `Commands::newSend` that it didn't use `readableBytes()` as 
the buffer's size, which could lead to a wrong check sum result when sending 
chunks because each chunk is a reference of the original large buffer and only 
the reader index is different.
   
   Implementations of consumer side:
   - Add a `MapCache` class to wrap a hash map with a double ended queue that 
records the UUIDs in order of time. Because when we removes an UUID, we need to 
remove the related cache from both the map and the queue. This class only 
exposes the necessary methods to avoid one of the two data structures is not 
cleaned. In addition, it makes test easier to verify the cache is cleared after 
chunks are merged.
   - For chunks, call `processMessageChunk` to cache these chunks and merge the 
chunks with the same UUID into the completed message once the last chunks 
arrived.
   
   Tests:
   - Add `MapCacheTest` to verify the public methods of `MapCache`.
   - Add `MessageChunkingTest` to verify the basic end to end case with all 
compression types.
   
   **NOTE**:
   
   This PR doesn't implement the PIP 37 perfectly. There are some known issues 
that should be fixed in following PRs.
   1. If some chunks are discarded by `chunkedMessageCache_.removeOldestValues` 
won't be redelivered. The reason needs to be investigated. Therefore, users 
must configure `maxPendingChunkedMessage` greater than the number of producers 
against the topic. Or just configure it with 0 simply.
   2. If a consumer restarts, the first chunk of consumed messages won't be 
received so that the buffered chunked messages will be discarded with `Received 
an uncached chunk` logs.
   
   
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (**yes**)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to