yangou opened a new issue #10497:
URL: https://github.com/apache/pulsar/issues/10497


   **Describe the bug**
   Delayed messages won't get delivered on partitioned topics.  Happens with 
both the built-in command-line consumer and golang consumer.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a partitioned topic, set the namespace policies as below:
   ```yaml
   bundles:
     numBundles: 16                                                     # 
bundles serve the purpose of topic load balancing among brokers
   backlog_quota_map:
     destination_storage:
       limit: 1099511627776                                             # 1TB, 
maximal backlog size of unack'ed messages
       policy: producer_request_hold                                    # keep 
the producer waiting/timeout when backlog reaches limit
   deduplicationEnabled: false
   autoTopicCreationOverride:
     allowAutoTopicCreation: false
   autoSubscriptionCreationOverride:
     allowAutoSubscriptionCreation: false
   message_ttl_in_seconds: 0                                            # don't 
expire unconsumed messages
   subscription_expiration_time_minutes: 0                              # don't 
expire subscription once inactive(no connections, no consumptions, etc)
   retention_policies:
     retentionTimeInMinutes: 0                                          # don't 
retent the messages around once they are consumed
     retentionSizeInMB: 0                                               # save 
as above
   delayed_delivery_policies:
     active: true                                                       # 
enable delayed delivery in now namespace
     tickTime: 1000                                                     # tick 
time for delayed delivery
   inactive_topic_policies:
     deleteWhileInactive: false                                         # 
disable auto-deletion on inactive topics
   ```
   2. Create the subscription manually with Latest position and Shared mode
   3. Launch the command line consumer for 1000 messages
   4. Try a couple of time, the command line consumer would stuck waiting for 
incoming messages.
   5. Check partitioned-stats with admin CLI you could potentially get below:
   6. subscription `scheduler` was from golang client, while subscription 
`test` was with CLI consumer. Restarting consumers would trigger pulsar broker 
to flush the messages
   ```yaml
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "bytesInCounter" : 349869,
     "msgInCounter" : 2906,
     "bytesOutCounter" : 316008,
     "msgOutCounter" : 2618,
     "averageMsgSize" : 0.0,
     "msgChunkPublished" : false,
     "storageSize" : 349869,
     "backlogSize" : 41503,
     "offloadedStorageSize" : 0,
     "publishers" : [ {
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0
     } ],
     "subscriptions" : {
       "scheduler" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 216457,
         "msgOutCounter" : 1794,
         "msgRateRedeliver" : 0.0,
         "chuckedMessageRate" : 0,
         "msgBacklog" : 134,
         "backlogSize" : 0,
         "msgBacklogNoDelayed" : 0,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 134,
         "unackedMessages" : 0,
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 216457,
           "msgOutCounter" : 1794,
           "msgRateRedeliver" : 0.0,
           "chuckedMessageRate" : 0.0,
           "availablePermits" : 6256,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0
         } ],
         "isDurable" : true,
         "isReplicated" : false,
         "consumersAfterMarkDeletePosition" : { },
         "nonContiguousDeletedMessagesRanges" : 29,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 2824
       },
       "test" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 99551,
         "msgOutCounter" : 824,
         "msgRateRedeliver" : 0.0,
         "chuckedMessageRate" : 0,
         "msgBacklog" : 134,
         "backlogSize" : 0,
         "msgBacklogNoDelayed" : 0,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 134,
         "unackedMessages" : 0,
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 99551,
           "msgOutCounter" : 824,
           "msgRateRedeliver" : 0.0,
           "chuckedMessageRate" : 0.0,
           "availablePermits" : 63176,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0
         } ],
         "isDurable" : true,
         "isReplicated" : false,
         "consumersAfterMarkDeletePosition" : { },
         "nonContiguousDeletedMessagesRanges" : 29,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 2670
       }
     },
     "replication" : { },
     "nonContiguousDeletedMessagesRanges" : 58,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 5494,
     "metadata" : {
       "partitions" : 64
     },
     "partitions" : { }
   }
   
   ```
   
   **Expected behavior**
   Messages should be cleared out with restart of consumers
   
   
   **Additional context**
   Add any other context about the problem here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to