BewareMyPower opened a new pull request #13627: URL: https://github.com/apache/pulsar/pull/13627
### Motivation This PR is a C++ catch up of https://github.com/apache/pulsar/pull/4400, which implements [PIP 37](https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar). ### Modifications Changes of the interfaces: - Add the `chunkingEnabled` config to producer so that producer can enable chunking messages by enabling `chunkingEnabled` and disabling `batchingEnabled`. - Add the `maxPendingChunkedMessage` and `autoOldestChunkedMessageOnQueueFull` configs to consumer to control the behavior to handle chunks. Implementations of producer side: - Refactor the `sendAsync` method to follow the similar logic of Java client. If `canAddToBatch` returns false and the message size exceeds the limit, split the large message into chunks and send them one by one. - Fix the bug of `Commands::newSend` that it didn't use `readableBytes()` as the buffer's size, which could lead to a wrong check sum result when sending chunks because each chunk is a reference of the original large buffer and only the reader index is different. Implementations of consumer side: - Add a `MapCache` class to wrap a hash map with a double ended queue that records the UUIDs in order of time. Because when we removes an UUID, we need to remove the related cache from both the map and the queue. This class only exposes the necessary methods to avoid one of the two data structures is not cleaned. In addition, it makes test easier to verify the cache is cleared after chunks are merged. - For chunks, call `processMessageChunk` to cache these chunks and merge the chunks with the same UUID into the completed message once the last chunks arrived. Tests: - Add `MapCacheTest` to verify the public methods of `MapCache`. - Add `MessageChunkingTest` to verify the basic end to end case with all compression types. **NOTE**: This PR doesn't implement the PIP 37 perfectly. There are some known issues that should be fixed in following PRs. 1. If some chunks are discarded by `chunkedMessageCache_.removeOldestValues` won't be redelivered. The reason needs to be investigated. Therefore, users must configure `maxPendingChunkedMessage` greater than the number of producers against the topic. Or just configure it with 0 simply. 2. If a consumer restarts, the first chunk of consumed messages won't be received so that the buffered chunked messages will be discarded with `Received an uncached chunk` logs. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (**yes**) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) - The rest endpoints: (yes / no) - The admin cli options: (yes / no) - Anything that affects deployment: (yes / no / don't know) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
