hangc0276 opened a new pull request #6719: fix consumer fetch message number maps to read entry number bug and expose avgMessagesPerEntry metric URL: https://github.com/apache/pulsar/pull/6719 ### Motivation when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request. Current strategy just using the following calculating formula to deal with those situation: ``` messagesToRead = Math.min(availablePermits, readBatchSize); ``` `availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput. ### Changes I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula ``` avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value ``` `avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000. When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula: ``` messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize); ``` I also expose the avgMessagePerEntry static value to consumer stat metric json.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
