codelipenghui commented on code in PR #21709:
URL: https://github.com/apache/pulsar/pull/21709#discussion_r1423362893


##########
pip/pip-323.md:
##########
@@ -0,0 +1,167 @@
+# PIP-323: Complete Backlog Quota Telemetry
+
+# Background knowledge
+
+## Backlog
+
+A topic in Pulsar is the place where messages are written to. They are 
consumed by subscriptions. A topic can have many
+subscriptions, and it is those that maintains the state of message 
acknowledgment, per subscription - which messages
+were acknowledged and which were not. 
+
+A subscription backlog is the set of unacknowledged messages in that 
subscription.
+A subscription backlog size is the sum of the size of the unacknowledged 
messages (in bytes)..
+
+Since a topic can have many subscriptions, and each has its own backlog, how 
does one define a backlog for a topic?
+A topic backlog is defined as the backlog of the subscription which has the 
**oldest** unacknowledged message. 
+Since acknowledged messages can be interleaved with unacknowledged messages, 
calculating the exact size of that 
+subscription backlog can be expensive as it requires I/O operations to read 
the messages from the ledgers.
+For that reason, the topic backlog size is actually defined to be the 
*estimated* backlog size of that subscription. 
+It does so by summarizing the size of all the ledgers, starting from the 
current active one (the one being written to),
+up to the ledger which contains the oldest unacknowledged message for that 
subscription (There is actually a faster 
+way to calculate it, but this was the definition chosen for this estimation in 
Pulsar).
+
+A topic backlog age is the age of the oldest unacknowledged message (same 
subscription as defined for topic backlog size).
+If that message was written 30 minutes ago, its age is 30 minutes, and so is 
the topic backlog age.
+
+## Backlog Quota
+
+Pulsar has a feature called [backlog 
quota](https://pulsar.apache.org/docs/3.1.x/cookbooks-retention-expiry/#backlog-quotas).
 
+It allows a user to define a quota - in effect, a limit - which limits the 
topic backlog.
+There are two types of quotas:
+
+1. Size based: The limit is for the topic backlog size (as we defined above).
+2. Time based: The limit is for the topic backlog age (as we defined above).
+
+Once a topic backlog exceeds either one of those limits, an action is taken to 
hold the backlog to that limit:
+
+* The producer write is placed on hold for a certain amount of time before 
failing.
+* The producer write is failed
+* The subscriptions oldest unacknowledged messages will be acknowledged 
in-order until both the topic backlog size or 
+  age will fall inside the limit (quota). The process is called backlog 
eviction (happens every interval).
+
+The quotas can be defined as a default value for any topic, by using the 
following broker configuration keys:
+`backlogQuotaDefaultLimitBytes` and `backlogQuotaDefaultLimitSecond`.
+
+The quota can also be specified directly for all topics in a given namespace 
using the namespace policy, 
+or a specific topic using a topic policy. 
+
+## Monitoring Backlog Quota
+
+The user today can calculate quota used for size based limit, since there are 
two metrics exposed today on 
+a topic level: `pulsar_storage_backlog_quota_limit` and 
`pulsar_storage_backlog_size`. 
+You can just divide the two to get a percentage and know how close the topic 
backlog to its size limit.
+
+For the time-based limit, the only metric exposed today is the quota itself - 
`pulsar_storage_backlog_quota_limit_time`
+
+## Backlog Quota Eviction in the Broker
+
+The broker has a method called `BrokerService.monitorBacklogQuota()`. It is 
scheduled to run every x seconds,
+as defined by the configuration `backlogQuotaCheckIntervalInSeconds`. 
+This method loops over all persistent topics, and for each topic is checks 
whether the topic backlog exceeded
+either one of those topics. 
+
+As mentioned before, checking backlog size is a memory-only calculation, since
+each topic has the list of ledgers stored in-memory, including the size of 
each ledger. Same goes for the subscriptions,
+they are all stored in memory, and the `ManagedCursor` keeps track of the 
subscription with the oldest unacknowledged 
+message, thus retrieveing it is O(1). Checking backlog based on time is costly 
if configuration key
+`preciseTimeBasedBacklogQuotaCheck` was set to true. In that case, it needs to 
read the oldest message to obtain
+its public timestamp, which is expensive in terms of I/O. If it was set to 
false, it's in-memory access only, since
+it uses the age of the ledger instead of the message, and the ledgers metadata 
is kept in memory.
+
+For each topic which has exceeded its quota, if the policy chosen is eviction, 
then the process it performed
+synchronously. This process consumes I/O, as it needs read messages (using 
skip) to know where to stop acknowledging
+messages.
+
+
+# Motivation
+
+Users which have defined backlog quota based on time, have no means today to 
monitor the backlog quota usage, 
+time-wise, to know whether the topic backlog is close to its time limit or 
even passed it.
+
+If it has passed it, the user has no means to know if it happened, when and 
how many times.
+
+
+# Goals
+
+## In Scope
+- Allow the user to know the backlog quota usage for time-based quota, per 
topic
+- Allow the user to know how many times backlog eviction happened, and for 
which backlog quota type
+
+## Out of Scope
+
+None
+
+
+# High Level Design
+
+We'll use the existing backlog monitoring process running in intervals. For 
each topic, the subscription with 
+the oldest unacknowledged message is retrieved, to calculate the topic backlog 
age. At that point, we will
+cache the following for the oldest unacknowledged message:
+* Subscription name 
+* Message position
+* Message publish timestamp
+
+That cache will allow us to add a metric exposing the topic backlog age - 
`pulsar_storage_backlog_age_seconds`, 
+which will be both consistent (same ones used for deciding on backlog 
eviction) and cheap to retrieve 
+(no additional I/O involved). 
+Coupled with the existing `pulsar_storage_backlog_quota_limit_time` metric, 
the user can use both to divide and
+get the usage of the quota (both are in seconds units).
+
+We will add the subscription name containing the oldest unacknowledged message 
to the Admin API
+topic stats endpoints (`{tenant}/{namespace}/{topic}/stats` and 
`{tenant}/{namespace}/{topic}/partitioned-stats`),
+allowing the user a complete workflow: alert using metrics when topic backlog 
is about to be exceeded, then
+query topic stats for that topic to retrieve the subscription name which 
contains the oldest message.
+For completeness, we will also add the backlog quota limits, both age and 
size, and the age of oldest 
+unacknowledged message.
+
+We will add a metric allowing the user to know how many times the usage 
exceeded the quota, both for time or size -
+`pulsar_storage_backlog_quota_exceeded_evictions_total`, where the 
`quota_type` label will be either `time` or 
+`size`. Monitoring that counter over time will allow the user to know when a 
topic backlog exceeded its quota,
+and if backlog eviction was chosen as action, then it happened, and how many 
times. 
+
+Some users may want the backlog quota check to happen more frequently, and as 
a consequence, the backlog age 
+metric more frequently updated. They can modify 
`backlogQuotaCheckIntervalInSeconds` configuration key, but without
+knowing how long this check takes, it will be hard for them. Hence, we will 
add the metric
+`pulsar_storage_backlog_quota_check_duration_seconds` which will be of 
histogram type.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Public API
+Adding the following to the response of topic stats, of both 
`{tenant}/{namespace}/{topic}/stats` 
+and `{tenant}/{namespace}/{topic}/partitioned-stats`:
+
+* `backlogQuotaLimitSize` - the size in bytes of the topic backlog quota 
+* `backlogQuotaLimitTime` - the topic backlog age quota, in seconds. 
+* `oldestBacklogMessageAgeSeconds` - the age of the oldest unacknowledged 
(i.e. backlog) message, measured by 
+   the time elapsed from its published time, in seconds. This value is 
recorded every backlog quota check 
+   interval, hence it represents the value seen in the last check.
+* `oldestBacklogMessageSubscriptionName` - the name of the subscription 
containing the oldest unacknowledged message.
+  This value is recorded every backlog quota check interval, hence it 
represents the value seen in the last check.
+
+
+### Metrics
+
+| Name                                                    | Description        
                                                                | Attributes    
                    | Units   |
+|---------------------------------------------------------|------------------------------------------------------------------------------------|-----------------------------------|---------|
+| `pulsar_storage_backlog_age_seconds`                    | Gauge. The age of 
the oldest unacknowledged message (backlog)                      | cluster, 
topic                    | seconds |
+| `pulsar_storage_backlog_quota_exceeded_evictions_total` | Counter. The 
number of times a backlog was evicted since it has exceeded its quota | 
cluster, topic, quota_type = time |         | 

Review Comment:
   Do we need to consider adding broker-level and namespace-level metrics?
   
   - pulsar_broker_storage_backlog_evictions_total (cluster)
   - pulsar_ns_storage_backlog_evictions_total (cluster, namespace)
   - pulsar_topic_storage_backlog_evictions_total (cluster, namespace, topic)



##########
pip/pip-323.md:
##########
@@ -0,0 +1,167 @@
+# PIP-323: Complete Backlog Quota Telemetry
+
+# Background knowledge
+
+## Backlog
+
+A topic in Pulsar is the place where messages are written to. They are 
consumed by subscriptions. A topic can have many
+subscriptions, and it is those that maintains the state of message 
acknowledgment, per subscription - which messages
+were acknowledged and which were not. 
+
+A subscription backlog is the set of unacknowledged messages in that 
subscription.
+A subscription backlog size is the sum of the size of the unacknowledged 
messages (in bytes)..
+
+Since a topic can have many subscriptions, and each has its own backlog, how 
does one define a backlog for a topic?
+A topic backlog is defined as the backlog of the subscription which has the 
**oldest** unacknowledged message. 
+Since acknowledged messages can be interleaved with unacknowledged messages, 
calculating the exact size of that 
+subscription backlog can be expensive as it requires I/O operations to read 
the messages from the ledgers.
+For that reason, the topic backlog size is actually defined to be the 
*estimated* backlog size of that subscription. 
+It does so by summarizing the size of all the ledgers, starting from the 
current active one (the one being written to),
+up to the ledger which contains the oldest unacknowledged message for that 
subscription (There is actually a faster 
+way to calculate it, but this was the definition chosen for this estimation in 
Pulsar).
+
+A topic backlog age is the age of the oldest unacknowledged message (same 
subscription as defined for topic backlog size).
+If that message was written 30 minutes ago, its age is 30 minutes, and so is 
the topic backlog age.
+
+## Backlog Quota
+
+Pulsar has a feature called [backlog 
quota](https://pulsar.apache.org/docs/3.1.x/cookbooks-retention-expiry/#backlog-quotas).
 
+It allows a user to define a quota - in effect, a limit - which limits the 
topic backlog.
+There are two types of quotas:
+
+1. Size based: The limit is for the topic backlog size (as we defined above).
+2. Time based: The limit is for the topic backlog age (as we defined above).
+
+Once a topic backlog exceeds either one of those limits, an action is taken to 
hold the backlog to that limit:
+
+* The producer write is placed on hold for a certain amount of time before 
failing.
+* The producer write is failed
+* The subscriptions oldest unacknowledged messages will be acknowledged 
in-order until both the topic backlog size or 
+  age will fall inside the limit (quota). The process is called backlog 
eviction (happens every interval).
+
+The quotas can be defined as a default value for any topic, by using the 
following broker configuration keys:
+`backlogQuotaDefaultLimitBytes` and `backlogQuotaDefaultLimitSecond`.
+
+The quota can also be specified directly for all topics in a given namespace 
using the namespace policy, 
+or a specific topic using a topic policy. 
+
+## Monitoring Backlog Quota
+
+The user today can calculate quota used for size based limit, since there are 
two metrics exposed today on 
+a topic level: `pulsar_storage_backlog_quota_limit` and 
`pulsar_storage_backlog_size`. 
+You can just divide the two to get a percentage and know how close the topic 
backlog to its size limit.
+
+For the time-based limit, the only metric exposed today is the quota itself - 
`pulsar_storage_backlog_quota_limit_time`
+
+## Backlog Quota Eviction in the Broker
+
+The broker has a method called `BrokerService.monitorBacklogQuota()`. It is 
scheduled to run every x seconds,
+as defined by the configuration `backlogQuotaCheckIntervalInSeconds`. 
+This method loops over all persistent topics, and for each topic is checks 
whether the topic backlog exceeded
+either one of those topics. 
+
+As mentioned before, checking backlog size is a memory-only calculation, since
+each topic has the list of ledgers stored in-memory, including the size of 
each ledger. Same goes for the subscriptions,
+they are all stored in memory, and the `ManagedCursor` keeps track of the 
subscription with the oldest unacknowledged 
+message, thus retrieveing it is O(1). Checking backlog based on time is costly 
if configuration key
+`preciseTimeBasedBacklogQuotaCheck` was set to true. In that case, it needs to 
read the oldest message to obtain
+its public timestamp, which is expensive in terms of I/O. If it was set to 
false, it's in-memory access only, since
+it uses the age of the ledger instead of the message, and the ledgers metadata 
is kept in memory.
+
+For each topic which has exceeded its quota, if the policy chosen is eviction, 
then the process it performed
+synchronously. This process consumes I/O, as it needs read messages (using 
skip) to know where to stop acknowledging
+messages.
+
+
+# Motivation
+
+Users which have defined backlog quota based on time, have no means today to 
monitor the backlog quota usage, 
+time-wise, to know whether the topic backlog is close to its time limit or 
even passed it.
+
+If it has passed it, the user has no means to know if it happened, when and 
how many times.
+
+
+# Goals
+
+## In Scope
+- Allow the user to know the backlog quota usage for time-based quota, per 
topic
+- Allow the user to know how many times backlog eviction happened, and for 
which backlog quota type
+
+## Out of Scope
+
+None
+
+
+# High Level Design
+
+We'll use the existing backlog monitoring process running in intervals. For 
each topic, the subscription with 
+the oldest unacknowledged message is retrieved, to calculate the topic backlog 
age. At that point, we will
+cache the following for the oldest unacknowledged message:
+* Subscription name 
+* Message position
+* Message publish timestamp
+
+That cache will allow us to add a metric exposing the topic backlog age - 
`pulsar_storage_backlog_age_seconds`, 
+which will be both consistent (same ones used for deciding on backlog 
eviction) and cheap to retrieve 
+(no additional I/O involved). 
+Coupled with the existing `pulsar_storage_backlog_quota_limit_time` metric, 
the user can use both to divide and
+get the usage of the quota (both are in seconds units).
+
+We will add the subscription name containing the oldest unacknowledged message 
to the Admin API
+topic stats endpoints (`{tenant}/{namespace}/{topic}/stats` and 
`{tenant}/{namespace}/{topic}/partitioned-stats`),
+allowing the user a complete workflow: alert using metrics when topic backlog 
is about to be exceeded, then
+query topic stats for that topic to retrieve the subscription name which 
contains the oldest message.
+For completeness, we will also add the backlog quota limits, both age and 
size, and the age of oldest 
+unacknowledged message.
+
+We will add a metric allowing the user to know how many times the usage 
exceeded the quota, both for time or size -
+`pulsar_storage_backlog_quota_exceeded_evictions_total`, where the 
`quota_type` label will be either `time` or 
+`size`. Monitoring that counter over time will allow the user to know when a 
topic backlog exceeded its quota,
+and if backlog eviction was chosen as action, then it happened, and how many 
times. 
+
+Some users may want the backlog quota check to happen more frequently, and as 
a consequence, the backlog age 
+metric more frequently updated. They can modify 
`backlogQuotaCheckIntervalInSeconds` configuration key, but without
+knowing how long this check takes, it will be hard for them. Hence, we will 
add the metric
+`pulsar_storage_backlog_quota_check_duration_seconds` which will be of 
histogram type.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Public API
+Adding the following to the response of topic stats, of both 
`{tenant}/{namespace}/{topic}/stats` 
+and `{tenant}/{namespace}/{topic}/partitioned-stats`:
+
+* `backlogQuotaLimitSize` - the size in bytes of the topic backlog quota 
+* `backlogQuotaLimitTime` - the topic backlog age quota, in seconds. 
+* `oldestBacklogMessageAgeSeconds` - the age of the oldest unacknowledged 
(i.e. backlog) message, measured by 
+   the time elapsed from its published time, in seconds. This value is 
recorded every backlog quota check 
+   interval, hence it represents the value seen in the last check.
+* `oldestBacklogMessageSubscriptionName` - the name of the subscription 
containing the oldest unacknowledged message.
+  This value is recorded every backlog quota check interval, hence it 
represents the value seen in the last check.
+
+
+### Metrics
+
+| Name                                                    | Description        
                                                                | Attributes    
                    | Units   |
+|---------------------------------------------------------|------------------------------------------------------------------------------------|-----------------------------------|---------|
+| `pulsar_storage_backlog_age_seconds`                    | Gauge. The age of 
the oldest unacknowledged message (backlog)                      | cluster, 
topic                    | seconds |
+| `pulsar_storage_backlog_quota_exceeded_evictions_total` | Counter. The 
number of times a backlog was evicted since it has exceeded its quota | 
cluster, topic, quota_type = time |         | 

Review Comment:
   Actually, I don't remember why we had `storage` for 
`pulsar_storage_backlog_size` before.
   Now, I don't think it will help something
   
   "pulsar_backlog_size" also looks good.
   
   Similarly, maybe we can consider to use
   
   - pulsar_broker_backlog_evictions_total (cluster)
   - pulsar_ns_backlog_evictions_total (cluster, namespace)
   - pulsar_topic_backlog_evictions_total (cluster, namespace, topic)



##########
pip/pip-323.md:
##########
@@ -0,0 +1,167 @@
+# PIP-323: Complete Backlog Quota Telemetry
+
+# Background knowledge
+
+## Backlog
+
+A topic in Pulsar is the place where messages are written to. They are 
consumed by subscriptions. A topic can have many
+subscriptions, and it is those that maintains the state of message 
acknowledgment, per subscription - which messages
+were acknowledged and which were not. 
+
+A subscription backlog is the set of unacknowledged messages in that 
subscription.
+A subscription backlog size is the sum of the size of the unacknowledged 
messages (in bytes)..
+
+Since a topic can have many subscriptions, and each has its own backlog, how 
does one define a backlog for a topic?
+A topic backlog is defined as the backlog of the subscription which has the 
**oldest** unacknowledged message. 
+Since acknowledged messages can be interleaved with unacknowledged messages, 
calculating the exact size of that 
+subscription backlog can be expensive as it requires I/O operations to read 
the messages from the ledgers.
+For that reason, the topic backlog size is actually defined to be the 
*estimated* backlog size of that subscription. 
+It does so by summarizing the size of all the ledgers, starting from the 
current active one (the one being written to),
+up to the ledger which contains the oldest unacknowledged message for that 
subscription (There is actually a faster 
+way to calculate it, but this was the definition chosen for this estimation in 
Pulsar).
+
+A topic backlog age is the age of the oldest unacknowledged message (same 
subscription as defined for topic backlog size).
+If that message was written 30 minutes ago, its age is 30 minutes, and so is 
the topic backlog age.
+
+## Backlog Quota
+
+Pulsar has a feature called [backlog 
quota](https://pulsar.apache.org/docs/3.1.x/cookbooks-retention-expiry/#backlog-quotas).
 
+It allows a user to define a quota - in effect, a limit - which limits the 
topic backlog.
+There are two types of quotas:
+
+1. Size based: The limit is for the topic backlog size (as we defined above).
+2. Time based: The limit is for the topic backlog age (as we defined above).
+
+Once a topic backlog exceeds either one of those limits, an action is taken to 
hold the backlog to that limit:
+
+* The producer write is placed on hold for a certain amount of time before 
failing.
+* The producer write is failed
+* The subscriptions oldest unacknowledged messages will be acknowledged 
in-order until both the topic backlog size or 
+  age will fall inside the limit (quota). The process is called backlog 
eviction (happens every interval).
+
+The quotas can be defined as a default value for any topic, by using the 
following broker configuration keys:
+`backlogQuotaDefaultLimitBytes` and `backlogQuotaDefaultLimitSecond`.
+
+The quota can also be specified directly for all topics in a given namespace 
using the namespace policy, 
+or a specific topic using a topic policy. 
+
+## Monitoring Backlog Quota
+
+The user today can calculate quota used for size based limit, since there are 
two metrics exposed today on 
+a topic level: `pulsar_storage_backlog_quota_limit` and 
`pulsar_storage_backlog_size`. 
+You can just divide the two to get a percentage and know how close the topic 
backlog to its size limit.
+
+For the time-based limit, the only metric exposed today is the quota itself - 
`pulsar_storage_backlog_quota_limit_time`
+
+## Backlog Quota Eviction in the Broker
+
+The broker has a method called `BrokerService.monitorBacklogQuota()`. It is 
scheduled to run every x seconds,
+as defined by the configuration `backlogQuotaCheckIntervalInSeconds`. 
+This method loops over all persistent topics, and for each topic is checks 
whether the topic backlog exceeded
+either one of those topics. 
+
+As mentioned before, checking backlog size is a memory-only calculation, since
+each topic has the list of ledgers stored in-memory, including the size of 
each ledger. Same goes for the subscriptions,
+they are all stored in memory, and the `ManagedCursor` keeps track of the 
subscription with the oldest unacknowledged 
+message, thus retrieveing it is O(1). Checking backlog based on time is costly 
if configuration key
+`preciseTimeBasedBacklogQuotaCheck` was set to true. In that case, it needs to 
read the oldest message to obtain
+its public timestamp, which is expensive in terms of I/O. If it was set to 
false, it's in-memory access only, since
+it uses the age of the ledger instead of the message, and the ledgers metadata 
is kept in memory.
+
+For each topic which has exceeded its quota, if the policy chosen is eviction, 
then the process it performed
+synchronously. This process consumes I/O, as it needs read messages (using 
skip) to know where to stop acknowledging
+messages.
+
+
+# Motivation
+
+Users which have defined backlog quota based on time, have no means today to 
monitor the backlog quota usage, 
+time-wise, to know whether the topic backlog is close to its time limit or 
even passed it.
+
+If it has passed it, the user has no means to know if it happened, when and 
how many times.
+
+
+# Goals
+
+## In Scope
+- Allow the user to know the backlog quota usage for time-based quota, per 
topic
+- Allow the user to know how many times backlog eviction happened, and for 
which backlog quota type
+
+## Out of Scope
+
+None
+
+
+# High Level Design
+
+We'll use the existing backlog monitoring process running in intervals. For 
each topic, the subscription with 
+the oldest unacknowledged message is retrieved, to calculate the topic backlog 
age. At that point, we will
+cache the following for the oldest unacknowledged message:
+* Subscription name 
+* Message position
+* Message publish timestamp
+
+That cache will allow us to add a metric exposing the topic backlog age - 
`pulsar_storage_backlog_age_seconds`, 
+which will be both consistent (same ones used for deciding on backlog 
eviction) and cheap to retrieve 
+(no additional I/O involved). 
+Coupled with the existing `pulsar_storage_backlog_quota_limit_time` metric, 
the user can use both to divide and
+get the usage of the quota (both are in seconds units).
+
+We will add the subscription name containing the oldest unacknowledged message 
to the Admin API
+topic stats endpoints (`{tenant}/{namespace}/{topic}/stats` and 
`{tenant}/{namespace}/{topic}/partitioned-stats`),
+allowing the user a complete workflow: alert using metrics when topic backlog 
is about to be exceeded, then
+query topic stats for that topic to retrieve the subscription name which 
contains the oldest message.
+For completeness, we will also add the backlog quota limits, both age and 
size, and the age of oldest 
+unacknowledged message.
+
+We will add a metric allowing the user to know how many times the usage 
exceeded the quota, both for time or size -
+`pulsar_storage_backlog_quota_exceeded_evictions_total`, where the 
`quota_type` label will be either `time` or 
+`size`. Monitoring that counter over time will allow the user to know when a 
topic backlog exceeded its quota,
+and if backlog eviction was chosen as action, then it happened, and how many 
times. 
+
+Some users may want the backlog quota check to happen more frequently, and as 
a consequence, the backlog age 
+metric more frequently updated. They can modify 
`backlogQuotaCheckIntervalInSeconds` configuration key, but without
+knowing how long this check takes, it will be hard for them. Hence, we will 
add the metric
+`pulsar_storage_backlog_quota_check_duration_seconds` which will be of 
histogram type.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Public API
+Adding the following to the response of topic stats, of both 
`{tenant}/{namespace}/{topic}/stats` 
+and `{tenant}/{namespace}/{topic}/partitioned-stats`:
+
+* `backlogQuotaLimitSize` - the size in bytes of the topic backlog quota 
+* `backlogQuotaLimitTime` - the topic backlog age quota, in seconds. 
+* `oldestBacklogMessageAgeSeconds` - the age of the oldest unacknowledged 
(i.e. backlog) message, measured by 
+   the time elapsed from its published time, in seconds. This value is 
recorded every backlog quota check 
+   interval, hence it represents the value seen in the last check.
+* `oldestBacklogMessageSubscriptionName` - the name of the subscription 
containing the oldest unacknowledged message.
+  This value is recorded every backlog quota check interval, hence it 
represents the value seen in the last check.
+
+
+### Metrics
+
+| Name                                                    | Description        
                                                                | Attributes    
                    | Units   |
+|---------------------------------------------------------|------------------------------------------------------------------------------------|-----------------------------------|---------|
+| `pulsar_storage_backlog_age_seconds`                    | Gauge. The age of 
the oldest unacknowledged message (backlog)                      | cluster, 
topic                    | seconds |

Review Comment:
   Do we need to mention that this one is only available if you enables the 
topic level metrics? For namespaces and broker level, we will not support this 
metric.



##########
pip/pip-323.md:
##########
@@ -0,0 +1,167 @@
+# PIP-323: Complete Backlog Quota Telemetry
+
+# Background knowledge
+
+## Backlog
+
+A topic in Pulsar is the place where messages are written to. They are 
consumed by subscriptions. A topic can have many
+subscriptions, and it is those that maintains the state of message 
acknowledgment, per subscription - which messages
+were acknowledged and which were not. 
+
+A subscription backlog is the set of unacknowledged messages in that 
subscription.
+A subscription backlog size is the sum of the size of the unacknowledged 
messages (in bytes)..
+
+Since a topic can have many subscriptions, and each has its own backlog, how 
does one define a backlog for a topic?
+A topic backlog is defined as the backlog of the subscription which has the 
**oldest** unacknowledged message. 
+Since acknowledged messages can be interleaved with unacknowledged messages, 
calculating the exact size of that 
+subscription backlog can be expensive as it requires I/O operations to read 
the messages from the ledgers.
+For that reason, the topic backlog size is actually defined to be the 
*estimated* backlog size of that subscription. 
+It does so by summarizing the size of all the ledgers, starting from the 
current active one (the one being written to),
+up to the ledger which contains the oldest unacknowledged message for that 
subscription (There is actually a faster 
+way to calculate it, but this was the definition chosen for this estimation in 
Pulsar).
+
+A topic backlog age is the age of the oldest unacknowledged message (same 
subscription as defined for topic backlog size).
+If that message was written 30 minutes ago, its age is 30 minutes, and so is 
the topic backlog age.
+
+## Backlog Quota
+
+Pulsar has a feature called [backlog 
quota](https://pulsar.apache.org/docs/3.1.x/cookbooks-retention-expiry/#backlog-quotas).
 
+It allows a user to define a quota - in effect, a limit - which limits the 
topic backlog.
+There are two types of quotas:
+
+1. Size based: The limit is for the topic backlog size (as we defined above).
+2. Time based: The limit is for the topic backlog age (as we defined above).
+
+Once a topic backlog exceeds either one of those limits, an action is taken to 
hold the backlog to that limit:
+
+* The producer write is placed on hold for a certain amount of time before 
failing.
+* The producer write is failed
+* The subscriptions oldest unacknowledged messages will be acknowledged 
in-order until both the topic backlog size or 
+  age will fall inside the limit (quota). The process is called backlog 
eviction (happens every interval).
+
+The quotas can be defined as a default value for any topic, by using the 
following broker configuration keys:
+`backlogQuotaDefaultLimitBytes` and `backlogQuotaDefaultLimitSecond`.
+
+The quota can also be specified directly for all topics in a given namespace 
using the namespace policy, 
+or a specific topic using a topic policy. 
+
+## Monitoring Backlog Quota
+
+The user today can calculate quota used for size based limit, since there are 
two metrics exposed today on 
+a topic level: `pulsar_storage_backlog_quota_limit` and 
`pulsar_storage_backlog_size`. 
+You can just divide the two to get a percentage and know how close the topic 
backlog to its size limit.
+
+For the time-based limit, the only metric exposed today is the quota itself - 
`pulsar_storage_backlog_quota_limit_time`
+
+## Backlog Quota Eviction in the Broker
+
+The broker has a method called `BrokerService.monitorBacklogQuota()`. It is 
scheduled to run every x seconds,
+as defined by the configuration `backlogQuotaCheckIntervalInSeconds`. 
+This method loops over all persistent topics, and for each topic is checks 
whether the topic backlog exceeded
+either one of those topics. 
+
+As mentioned before, checking backlog size is a memory-only calculation, since
+each topic has the list of ledgers stored in-memory, including the size of 
each ledger. Same goes for the subscriptions,
+they are all stored in memory, and the `ManagedCursor` keeps track of the 
subscription with the oldest unacknowledged 
+message, thus retrieveing it is O(1). Checking backlog based on time is costly 
if configuration key
+`preciseTimeBasedBacklogQuotaCheck` was set to true. In that case, it needs to 
read the oldest message to obtain
+its public timestamp, which is expensive in terms of I/O. If it was set to 
false, it's in-memory access only, since
+it uses the age of the ledger instead of the message, and the ledgers metadata 
is kept in memory.
+
+For each topic which has exceeded its quota, if the policy chosen is eviction, 
then the process it performed
+synchronously. This process consumes I/O, as it needs read messages (using 
skip) to know where to stop acknowledging
+messages.
+
+
+# Motivation
+
+Users which have defined backlog quota based on time, have no means today to 
monitor the backlog quota usage, 
+time-wise, to know whether the topic backlog is close to its time limit or 
even passed it.
+
+If it has passed it, the user has no means to know if it happened, when and 
how many times.
+
+
+# Goals
+
+## In Scope
+- Allow the user to know the backlog quota usage for time-based quota, per 
topic
+- Allow the user to know how many times backlog eviction happened, and for 
which backlog quota type
+
+## Out of Scope
+
+None
+
+
+# High Level Design
+
+We'll use the existing backlog monitoring process running in intervals. For 
each topic, the subscription with 
+the oldest unacknowledged message is retrieved, to calculate the topic backlog 
age. At that point, we will
+cache the following for the oldest unacknowledged message:
+* Subscription name 
+* Message position
+* Message publish timestamp
+
+That cache will allow us to add a metric exposing the topic backlog age - 
`pulsar_storage_backlog_age_seconds`, 
+which will be both consistent (same ones used for deciding on backlog 
eviction) and cheap to retrieve 
+(no additional I/O involved). 
+Coupled with the existing `pulsar_storage_backlog_quota_limit_time` metric, 
the user can use both to divide and
+get the usage of the quota (both are in seconds units).
+
+We will add the subscription name containing the oldest unacknowledged message 
to the Admin API
+topic stats endpoints (`{tenant}/{namespace}/{topic}/stats` and 
`{tenant}/{namespace}/{topic}/partitioned-stats`),
+allowing the user a complete workflow: alert using metrics when topic backlog 
is about to be exceeded, then
+query topic stats for that topic to retrieve the subscription name which 
contains the oldest message.
+For completeness, we will also add the backlog quota limits, both age and 
size, and the age of oldest 
+unacknowledged message.
+
+We will add a metric allowing the user to know how many times the usage 
exceeded the quota, both for time or size -
+`pulsar_storage_backlog_quota_exceeded_evictions_total`, where the 
`quota_type` label will be either `time` or 
+`size`. Monitoring that counter over time will allow the user to know when a 
topic backlog exceeded its quota,
+and if backlog eviction was chosen as action, then it happened, and how many 
times. 
+
+Some users may want the backlog quota check to happen more frequently, and as 
a consequence, the backlog age 
+metric more frequently updated. They can modify 
`backlogQuotaCheckIntervalInSeconds` configuration key, but without
+knowing how long this check takes, it will be hard for them. Hence, we will 
add the metric
+`pulsar_storage_backlog_quota_check_duration_seconds` which will be of 
histogram type.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Public API
+Adding the following to the response of topic stats, of both 
`{tenant}/{namespace}/{topic}/stats` 
+and `{tenant}/{namespace}/{topic}/partitioned-stats`:
+
+* `backlogQuotaLimitSize` - the size in bytes of the topic backlog quota 
+* `backlogQuotaLimitTime` - the topic backlog age quota, in seconds. 
+* `oldestBacklogMessageAgeSeconds` - the age of the oldest unacknowledged 
(i.e. backlog) message, measured by 
+   the time elapsed from its published time, in seconds. This value is 
recorded every backlog quota check 
+   interval, hence it represents the value seen in the last check.
+* `oldestBacklogMessageSubscriptionName` - the name of the subscription 
containing the oldest unacknowledged message.
+  This value is recorded every backlog quota check interval, hence it 
represents the value seen in the last check.
+
+
+### Metrics
+
+| Name                                                    | Description        
                                                                | Attributes    
                    | Units   |
+|---------------------------------------------------------|------------------------------------------------------------------------------------|-----------------------------------|---------|
+| `pulsar_storage_backlog_age_seconds`                    | Gauge. The age of 
the oldest unacknowledged message (backlog)                      | cluster, 
topic                    | seconds |

Review Comment:
   Do we also need to add the namespace label even if we have the topic label? 
Usually, if you check the dashboard from Grafana, you will select the namespace 
first and then select the topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to