4onni opened a new issue #12208:
URL: https://github.com/apache/pulsar/issues/12208


   Pulsar version : 2.7.3
   Pulsar Client version: 2.7.1
   
   I have a topic that sends delayed messages, and consumers use Key_shard 
mode, and whenever consumers make changes (batch reboots or adding machines) 
messages start to pile up and deliveries stop. Here are the topic statistics 
when stuck 
   ```json
   "persistent://infra/es-channel/coupon_activity-partition-0": {
         "msgRateIn": 99.32590087869627,
         "msgRateOut": 0,
         "msgThroughputIn": 81197.97397460767,
         "msgThroughputOut": 0,
         "averageMsgSize": 817.490435589125,
         "storageSize": 28423651,
         "publishers": [
           {
             "producerId": 1,
             "msgRateIn": 1.3998954266917099,
             "msgThroughputIn": 645.8350889355218,
             "averageMsgSize": 461,
             "metadata": {}
           },
           {
             "producerId": 2,
             "msgRateIn": 97.92600545200456,
             "msgThroughputIn": 80552.13888567215,
             "averageMsgSize": 822,
             "metadata": {}
           }
         ],
         "subscriptions": {
           "exp-es-sync_order_sms_send_info_sub": {
             "blockedSubscriptionOnUnackedMsgs": false,
             "isReplicated": false,
             "msgRateOut": 0,
             "msgThroughputOut": 0,
             "msgRateRedeliver": 0,
             "msgRateExpired": 0,
             "msgBacklog": 31594,
             "msgDelayed": 136,
             "unackedMessages": 0,
             "type": "Key_Shared",
             "activeConsumerName": "",
             "consumers": [
               {
                 "blockedConsumerOnUnackedMsgs": false,
                 "availablePermits": 957,
                 "unackedMessages": 0,
                 "msgRateOut": 0,
                 "msgThroughputOut": 0,
                 "msgRateRedeliver": 0,
                 "consumerName": "exp-es-sync_consumer_10.42.58.61_0",
                 "metadata": {}
               },
               {
                 "blockedConsumerOnUnackedMsgs": false,
                 "availablePermits": 836,
                 "unackedMessages": 0,
                 "msgRateOut": 0,
                 "msgThroughputOut": 0,
                 "msgRateRedeliver": 0,
   ......
   ......
   .....
   ```
   
   Note: unload topic can solve the problem
   
   Steps to reproduce the behavior:
   1. a producer keeps sending delayed messages
   ```java
   producer.newMessage().key("" + index).value("a message : " + (index))
                           .deliverAfter(3, TimeUnit.SECONDS)
                           .sendAsync()
   ```
   2. start several consumers in Key_shard mode
   ```java
   IntStream.range(start, end).forEach(index -> {
               AtomicInteger i  = new AtomicInteger();
               client.newConsumer(Schema.STRING)
                       .topic("persistent://public/default/3p")
                       .subscriptionName("test_sub")
                       .consumerName("c_" + index)
                       .subscriptionType(SubscriptionType.Key_Shared)
   ........
   .......
   ```
   3. and then add or restart consumers
   4. you'll see consumers get stuck
   
   
   
   The expectation that no matter how consumers change, they won't get stuck
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to