zbentley opened a new issue, #15609: URL: https://github.com/apache/pulsar/issues/15609
# Describe the bug Using partitioned topics and KeyShared subscriptions, when a time-based quota is exceeded, the quota is not properly "cleared", so producer creation and publication still gets ProducerBlockedQuotaExceeded even when there is no backlog on the topic. Unloading the topic temporarily resolves the issue, but it reoccurs. # Behavior Earlier today, we had a production user with a topic that got backlogged due to consumer shutdown, and their producers all got ProducerBlockedQuotaExceededExceptions (well, actually they got UnknownErrors because of https://github.com/apache/pulsar/issues/15078, but the logger showed the ProducerBlockedQuotaExceededException). However, once consumers started and drained the backlog (pulsar_subscription_back_log reported 0 in prometheus for the only subscription on the topic), producers kept hitting the ProducerBlockedQuotaExceededException. New producers/new processes had the issue as well. Unloading the topic temporarily resolved the issue, but it reoccurred repeatedly. Deleting/re-creating the topic also resolved the issue, but it also reoccurred. **This issue DOES reoccur even if consumers are present on the topic.** There appears to be a risk of it occurring every time the topic's backlog drops to 0. # To reproduce 1. Create a persistent, partitioned topic with a single KeyShared subscription. 1. On that topic's namespace, create a backlog policy with a short time-based TTL, e.g. ``` "message_age" : { "limitSize" : -1, "limitTime" : 120, "policy" : "producer_exception" } ``` 1. Start and stop a consumer on the subscription. 1. Start a producer and produce one or more messages. 1. Wait 2 minutes. 1. Attempt to start a producer and verify that it fails to start with ProducerBlockedQuotaExceeded. 1. Start a KeyShared consumer on the sole subscription and drain the topic, acking all messages. 1. Reattempt to start the producer. 1. Observe that the ProducerBlockedQuotaExceeded error still occurs. # Broker heap dump Available on request; it's too big for a GH attachment. # Context Linux, Client 2.8.1, broker 2.8.1, deployed either standalone or in StreamNative Platform Topics have 4 partitions All producers use key-based batching, all consumers use KeyShared subscription mode. Topic has a single KeyShared subscription. Policies on the namespace (no topic-level policies in use): ```json "auth_policies" : { "namespace_auth" : { }, "destination_auth" : { }, "subscription_auth_roles" : { } }, "replication_clusters" : [ "sn-platform" ], "bundles" : { "boundaries" : [ "0x00000000", "0x40000000", "0x80000000", "0xc0000000", "0xffffffff" ], "numBundles" : 4 }, "backlog_quota_map" : { "message_age" : { "limitSize" : -1, "limitTime" : 7200, "policy" : "producer_exception" } }, "clusterDispatchRate" : { }, "topicDispatchRate" : { }, "subscriptionDispatchRate" : { }, "replicatorDispatchRate" : { }, "clusterSubscribeRate" : { }, "persistence" : { "bookkeeperEnsemble" : 2, "bookkeeperWriteQuorum" : 2, "bookkeeperAckQuorum" : 1, "managedLedgerMaxMarkDeleteRate" : 0.0 }, "deduplicationEnabled" : false, "autoTopicCreationOverride" : { "allowAutoTopicCreation" : false, "topicType" : "non-partitioned", "defaultNumPartitions" : 0 }, "autoSubscriptionCreationOverride" : { "allowAutoSubscriptionCreation" : false }, "publishMaxMessageRate" : { }, "latency_stats_sample_rate" : { }, "message_ttl_in_seconds" : 0, "subscription_expiration_time_minutes" : 0, "retention_policies" : { "retentionTimeInMinutes" : 720, "retentionSizeInMB" : -1 }, "deleted" : false, "encryption_required" : false, "delayed_delivery_policies" : { "tickTime" : 15, "active" : false }, "inactive_topic_policies" : { "inactiveTopicDeleteMode" : "delete_when_no_subscriptions", "maxInactiveDurationSeconds" : -1, "deleteWhileInactive" : false }, "subscription_auth_mode" : "None", "max_producers_per_topic" : 100, "max_consumers_per_topic" : 100, "max_consumers_per_subscription" : 0, "max_unacked_messages_per_consumer" : 100, "max_unacked_messages_per_subscription" : 10000, "offload_threshold" : -1, "schema_auto_update_compatibility_strategy" : "AutoUpdateDisabled", "schema_compatibility_strategy" : "UNDEFINED", "is_allow_auto_update_schema" : true, "schema_validation_enforced" : false, "subscription_types_enabled" : [ ], ``` Output of `partitioned-stats` for the topic: ``` { "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesInCounter" : 1064426, "msgInCounter" : 1471, "bytesOutCounter" : 1077416, "msgOutCounter" : 1471, "averageMsgSize" : 0.0, "msgChunkPublished" : false, "storageSize" : -10905, "backlogSize" : -12990, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ ], "waitingPublishers" : 0, "subscriptions" : { "chariot_subscription-perform_badging-perform_badging_1" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 1077416, "msgOutCounter" : 1471, "msgRateRedeliver" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 0, "backlogSize" : 0, "msgBacklogNoDelayed" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 0, "lastConsumedTimestamp" : 0, "lastAckedTimestamp" : 0, "lastMarkDeleteAdvancedTimestamp" : 0, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 0, "msgOutCounter" : 0, "msgRateRedeliver" : 0.0, "chunkedMessageRate" : 0.0, "availablePermits" : 20, "unackedMessages" : 0, "avgMessagesPerEntry" : 0, "blockedConsumerOnUnackedMsgs" : false, "readPositionWhenJoining" : "44713:1", "lastAckedTimestamp" : 0, "lastConsumedTimestamp" : 0 } ], "isDurable" : true, "isReplicated" : false, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 36, "durable" : true, "replicated" : false } }, "replication" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 36, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 }, "metadata" : { "partitions" : 4 }, "partitions" : { } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
