hangc0276 opened a new pull request #6719: fix consumer fetch message number 
maps to read entry number bug and expose avgMessagesPerEntry metric
URL: https://github.com/apache/pulsar/pull/6719
 
 
   ### Motivation
   when consumer send fetch request to broker server, it contains fetch message 
number telling the server how many messges should be pushed to consumer client. 
However, the broker server stores data in bookkeeper or broker cache according 
to entry not single message if producer produce message using batch feature. 
There is a gap to map the number of message to the number of entry when dealing 
with consumer fetch request.
   
   Current strategy just using the following calculating formula to deal with 
those situation:
   ```
   messagesToRead = Math.min(availablePermits, readBatchSize);
   ```
   `availablePermits` is the number of message the consumer requested, and 
`readBatchSize` is the max read entry size set in broker.conf
   
   Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each 
entry contains 1000 messages, messagesToRead will be 500 according to this 
formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` 
messages from bookkeeper or broker cache and push `500,000` messages to 
consumer at one time though the consumer just need 1000 messages, which leading 
the consumer cost too much memory to store the fetched message, especially when 
we increase `readBatchSize` to increase bookkeeper read throughput.
   
   ### Changes
   I add an variable `avgMessagesPerEntry` to record average messages stored in 
one entry. It will update when broker server push message to the consumer using 
the following calculating formula
   ```
   avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * 
new Value
   ```
   `avgMessagePerEntry` is the history average message number per entry and 
`new Value` is the message number per entry in the fetch request the broker 
read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value 
just control the history avgMessagePerEntry decay rate when update new one. The 
avgMessagePerEntry initial value is 1000.
   
   When dealing with consumer fetch request, it will map fetch requst number to 
entry number according to the following formula:
   ```
   messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / 
avgMessagesPerEntry), readBatchSize);
   ```
   
   I also expose the avgMessagePerEntry static value to consumer stat metric 
json.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to