zbentley opened a new issue, #15609:
URL: https://github.com/apache/pulsar/issues/15609

   # Describe the bug
   Using partitioned topics and KeyShared subscriptions, when a time-based 
quota is exceeded, the quota is not properly "cleared", so producer creation 
and publication still gets ProducerBlockedQuotaExceeded even when there is no 
backlog on the topic.
   
   Unloading the topic temporarily resolves the issue, but it reoccurs.
   
   # Behavior
   
   Earlier today, we had a production user with a topic that got backlogged due 
to consumer shutdown, and their producers all got 
ProducerBlockedQuotaExceededExceptions (well, actually they got UnknownErrors 
because of https://github.com/apache/pulsar/issues/15078, but the logger showed 
the ProducerBlockedQuotaExceededException).
   
   However, once consumers started and drained the backlog 
(pulsar_subscription_back_log reported 0 in prometheus for the only 
subscription on the topic), producers kept hitting the 
ProducerBlockedQuotaExceededException. New producers/new processes had the 
issue as well.
   
   Unloading the topic temporarily resolved the issue, but it reoccurred 
repeatedly. Deleting/re-creating the topic also resolved the issue, but it also 
reoccurred. 
   
   **This issue DOES reoccur even if consumers are present on the topic.** 
There appears to be a risk of it occurring every time the topic's backlog drops 
to 0.
   
   # To reproduce
   
   1. Create a persistent, partitioned topic with a single KeyShared 
subscription.
   1. On that topic's namespace, create a backlog policy with a short 
time-based TTL, e.g. ```    "message_age" : {
         "limitSize" : -1,
         "limitTime" : 120,
         "policy" : "producer_exception"
       }
   ```
   1. Start and stop a consumer on the subscription.
   1. Start a producer and produce one or more messages.
   1. Wait 2 minutes.
   1. Attempt to start a producer and verify that it fails to start with 
ProducerBlockedQuotaExceeded.
   1. Start a KeyShared consumer on the sole subscription and drain the topic, 
acking all messages.
   1. Reattempt to start the producer.
   1. Observe that the ProducerBlockedQuotaExceeded error still occurs.
   
   # Broker heap dump
   
   Available on request; it's too big for a GH attachment.
   
   # Context
   
   Linux, Client 2.8.1, broker 2.8.1, deployed either standalone or in 
StreamNative Platform
   
   Topics have 4 partitions
   
   All producers use key-based batching, all consumers use KeyShared 
subscription mode.
   
   Topic has a single KeyShared subscription.
   
   Policies on the namespace (no topic-level policies in use):
   
   ```json
    "auth_policies" : {
       "namespace_auth" : { },
       "destination_auth" : { },
       "subscription_auth_roles" : { }
     },
     "replication_clusters" : [ "sn-platform" ],
     "bundles" : {
       "boundaries" : [ "0x00000000", "0x40000000", "0x80000000", "0xc0000000", 
"0xffffffff" ],
       "numBundles" : 4
     },
     "backlog_quota_map" : {
       "message_age" : {
         "limitSize" : -1,
         "limitTime" : 7200,
         "policy" : "producer_exception"
       }
     },
     "clusterDispatchRate" : { },
     "topicDispatchRate" : { },
     "subscriptionDispatchRate" : { },
     "replicatorDispatchRate" : { },
     "clusterSubscribeRate" : { },
     "persistence" : {
       "bookkeeperEnsemble" : 2,
       "bookkeeperWriteQuorum" : 2,
       "bookkeeperAckQuorum" : 1,
       "managedLedgerMaxMarkDeleteRate" : 0.0
     },
     "deduplicationEnabled" : false,
     "autoTopicCreationOverride" : {
       "allowAutoTopicCreation" : false,
       "topicType" : "non-partitioned",
       "defaultNumPartitions" : 0
     },
     "autoSubscriptionCreationOverride" : {
       "allowAutoSubscriptionCreation" : false
     },
     "publishMaxMessageRate" : { },
     "latency_stats_sample_rate" : { },
     "message_ttl_in_seconds" : 0,
     "subscription_expiration_time_minutes" : 0,
     "retention_policies" : {
       "retentionTimeInMinutes" : 720,
       "retentionSizeInMB" : -1
     },
     "deleted" : false,
     "encryption_required" : false,
     "delayed_delivery_policies" : {
       "tickTime" : 15,
       "active" : false
     },
     "inactive_topic_policies" : {
       "inactiveTopicDeleteMode" : "delete_when_no_subscriptions",
       "maxInactiveDurationSeconds" : -1,
       "deleteWhileInactive" : false
     },
     "subscription_auth_mode" : "None",
     "max_producers_per_topic" : 100,
     "max_consumers_per_topic" : 100,
     "max_consumers_per_subscription" : 0,
     "max_unacked_messages_per_consumer" : 100,
     "max_unacked_messages_per_subscription" : 10000,
     "offload_threshold" : -1,
     "schema_auto_update_compatibility_strategy" : "AutoUpdateDisabled",
     "schema_compatibility_strategy" : "UNDEFINED",
     "is_allow_auto_update_schema" : true,
     "schema_validation_enforced" : false,
     "subscription_types_enabled" : [ ],
   ```
   
   Output of `partitioned-stats` for the topic:
   ```
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "bytesInCounter" : 1064426,
     "msgInCounter" : 1471,
     "bytesOutCounter" : 1077416,
     "msgOutCounter" : 1471,
     "averageMsgSize" : 0.0,
     "msgChunkPublished" : false,
     "storageSize" : -10905,
     "backlogSize" : -12990,
     "offloadedStorageSize" : 0,
     "lastOffloadLedgerId" : 0,
     "lastOffloadSuccessTimeStamp" : 0,
     "lastOffloadFailureTimeStamp" : 0,
     "publishers" : [ ],
     "waitingPublishers" : 0,
     "subscriptions" : {
       "chariot_subscription-perform_badging-perform_badging_1" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 1077416,
         "msgOutCounter" : 1471,
         "msgRateRedeliver" : 0.0,
         "chunkedMessageRate" : 0,
         "msgBacklog" : 0,
         "backlogSize" : 0,
         "msgBacklogNoDelayed" : 0,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 0,
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 0,
           "msgOutCounter" : 0,
           "msgRateRedeliver" : 0.0,
           "chunkedMessageRate" : 0.0,
           "availablePermits" : 20,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "readPositionWhenJoining" : "44713:1",
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 0
         } ],
         "isDurable" : true,
         "isReplicated" : false,
         "allowOutOfOrderDelivery" : false,
         "consumersAfterMarkDeletePosition" : { },
         "nonContiguousDeletedMessagesRanges" : 0,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 36,
         "durable" : true,
         "replicated" : false
       }
     },
     "replication" : { },
     "nonContiguousDeletedMessagesRanges" : 0,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 36,
     "compaction" : {
       "lastCompactionRemovedEventCount" : 0,
       "lastCompactionSucceedTimestamp" : 0,
       "lastCompactionFailedTimestamp" : 0,
       "lastCompactionDurationTimeInMills" : 0
     },
     "metadata" : {
       "partitions" : 4
     },
     "partitions" : { }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to