codelipenghui edited a comment on issue #3131: Consumer stop receive messages 
from broker
URL: https://github.com/apache/pulsar/issues/3131#issuecomment-453328896
 
 
   ### Broker dispatch process:
   1. In the test, we will
   settings: broker.MaxUnackMessagesPerConsumer=100 and 
broker.MaxUnackMessagesPerSubscription=500. 
   setup 2 consumer in share mode, and set consumer.receiverQueueSize=100.   
Producer enable batching, and set producer.BatchingMaxMessages=2000.
   In `PersistentDispatcherMultipleConsumers`, we have fixed setting : 
`MaxReadBatchSize`=100, and `MaxRoundRobinBatchSize`=20;
   2. Once a consumer subscribes to a topic, consumer will send a flow request 
with 100(receiverQueueSize) permits to broker to get messages.
   3. By default, broker will read 100(MaxReadBatchSize) entries from ledger 
and distribute entries to 2 consumers, each consumer is expecting to get 
20(MaxRoundRobinBatchSize) entries. 
   4. Once broker send messages to the first consumer, unacked messages will be 
40000(20 * 2000), dispatcher will be blocked, so the dispatcher init available 
permits is 200(2 * consumer.receiverQueueSize), after send 40000 messages(20 
permits) to the first consumer, available permits is 200 - 40000 = -39800. and 
Dispatcher is set as blocked.
   5. Client Consumer start receiving messages, then after received 50 
messsages, it reaches threshold (receiverQueueSize,100/2), consumer will send 
Flow request to the broker again, but now broker.consumer is blocked by un-ack 
messages, so broker do not send any messages to consumer, and broker.consumer 
will increase permitsReceivedWhileConsumerBlocked.
   6. Since messages received but not acked,  consumer will send a redelivery 
request, dispatcher should release the flow control, but the logic in the 
[broker.consumer](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L632),
 consumer will use min(permitsReceivedWhileConsumerBlocked, totalRedelivery) to 
increase the available permits in the dispatcher.
   
   I think the problem is here, if totalRedelivery < 
permitsReceivedWhileConsumerBlocked, consumer is already send flow request, but 
broker can't release the flow control in time, because each flowPermits is too 
small(50) comparing to permits in broker(-40000) .
   e.g. 
   When a consumer receive 10000 messages(5 message, batch size is 2000), 
consumer will send 200 times flow request(50 messages per request), 
   If permitsReceivedWhileConsumerBlocked = 9000, at this time consumer 
redelivery 2000 messages, dispatcher will use 2000 to increase the available 
permits. only after consumer send the last 1000 messages flow request, 
dispatcher is un-blocked.
   
   This is the test log:
   
   
[stop-consume-broker.log](https://github.com/apache/pulsar/files/2747607/stop-consume-broker.log)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to