4onni opened a new issue #12208:
URL: https://github.com/apache/pulsar/issues/12208
Pulsar version : 2.7.3
Pulsar Client version: 2.7.1
I have a topic that sends delayed messages, and consumers use Key_shard
mode, and whenever consumers make changes (batch reboots or adding machines)
messages start to pile up and deliveries stop. Here are the topic statistics
when stuck
```json
"persistent://infra/es-channel/coupon_activity-partition-0": {
"msgRateIn": 99.32590087869627,
"msgRateOut": 0,
"msgThroughputIn": 81197.97397460767,
"msgThroughputOut": 0,
"averageMsgSize": 817.490435589125,
"storageSize": 28423651,
"publishers": [
{
"producerId": 1,
"msgRateIn": 1.3998954266917099,
"msgThroughputIn": 645.8350889355218,
"averageMsgSize": 461,
"metadata": {}
},
{
"producerId": 2,
"msgRateIn": 97.92600545200456,
"msgThroughputIn": 80552.13888567215,
"averageMsgSize": 822,
"metadata": {}
}
],
"subscriptions": {
"exp-es-sync_order_sms_send_info_sub": {
"blockedSubscriptionOnUnackedMsgs": false,
"isReplicated": false,
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgRateRedeliver": 0,
"msgRateExpired": 0,
"msgBacklog": 31594,
"msgDelayed": 136,
"unackedMessages": 0,
"type": "Key_Shared",
"activeConsumerName": "",
"consumers": [
{
"blockedConsumerOnUnackedMsgs": false,
"availablePermits": 957,
"unackedMessages": 0,
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgRateRedeliver": 0,
"consumerName": "exp-es-sync_consumer_10.42.58.61_0",
"metadata": {}
},
{
"blockedConsumerOnUnackedMsgs": false,
"availablePermits": 836,
"unackedMessages": 0,
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgRateRedeliver": 0,
......
......
.....
```
Note: unload topic can solve the problem
Steps to reproduce the behavior:
1. a producer keeps sending delayed messages
```java
producer.newMessage().key("" + index).value("a message : " + (index))
.deliverAfter(3, TimeUnit.SECONDS)
.sendAsync()
```
2. start several consumers in Key_shard mode
```java
IntStream.range(start, end).forEach(index -> {
AtomicInteger i = new AtomicInteger();
client.newConsumer(Schema.STRING)
.topic("persistent://public/default/3p")
.subscriptionName("test_sub")
.consumerName("c_" + index)
.subscriptionType(SubscriptionType.Key_Shared)
........
.......
```
3. and then add or restart consumers
4. you'll see consumers get stuck
The expectation that no matter how consumers change, they won't get stuck
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]