codelipenghui edited a comment on issue #3131: Consumer stop receive messages from broker URL: https://github.com/apache/pulsar/issues/3131#issuecomment-453328896 ### Broker dispatch process: 1. In the test, we will settings: broker.MaxUnackMessagesPerConsumer=100 and broker.MaxUnackMessagesPerSubscription=500. setup 2 consumer in share mode, and set consumer.receiverQueueSize=100. Producer enable batching, and set producer.BatchingMaxMessages=2000. In `PersistentDispatcherMultipleConsumers`, we have fixed setting : `MaxReadBatchSize`=100, and `MaxRoundRobinBatchSize`=20; 2. Once a consumer subscribes to a topic, consumer will send a flow request with 100(receiverQueueSize) permits to broker to get messages. 3. By default, broker will read 100(MaxReadBatchSize) entries from ledger and distribute entries to 2 consumers, each consumer is expecting to get 20(MaxRoundRobinBatchSize) entries. 4. Once broker send messages to the first consumer, unacked messages will be 40000(20 * 2000), dispatcher will be blocked, so the dispatcher init available permits is 200(2 * consumer.receiverQueueSize), after send 40000 messages(20 permits) to the first consumer, available permits is 200 - 40000 = -39800. and Dispatcher is set as blocked. 5. Client Consumer start receiving messages, then after received 50 messsages, it reaches threshold (receiverQueueSize,100/2), consumer will send Flow request to the broker again, but now broker.consumer is blocked by un-ack messages, so broker do not send any messages to consumer, and broker.consumer will increase permitsReceivedWhileConsumerBlocked. 6. Since messages received but not acked, consumer will send a redelivery request, dispatcher should release the flow control, but the logic in the [broker.consumer](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L632), consumer will use min(permitsReceivedWhileConsumerBlocked, totalRedelivery) to increase the available permits in the dispatcher. I think the problem is here, if totalRedelivery < permitsReceivedWhileConsumerBlocked, consumer is already send flow request, but broker can't release the flow control in time, because each flowPermits is too small(50) comparing to permits in broker(-40000) . e.g. When a consumer receive 10000 messages(5 message, batch size is 2000), consumer will send 200 times flow request(50 messages per request), If permitsReceivedWhileConsumerBlocked = 9000, at this time consumer redelivery 2000 messages, dispatcher will use 2000 to increase the available permits. only after consumer send the last 1000 messages flow request, dispatcher is un-blocked. This is the test log: [stop-consume-broker.log](https://github.com/apache/pulsar/files/2747607/stop-consume-broker.log)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services