This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9fbb92a69b5 [improve][pip] PIP-303: Add optional parameters for
getPartitionedStats (#21228)
9fbb92a69b5 is described below
commit 9fbb92a69b5f392212e086cbac8cd24c49fecb29
Author: crossoverJie <[email protected]>
AuthorDate: Mon Nov 20 16:34:31 2023 +0800
[improve][pip] PIP-303: Add optional parameters for getPartitionedStats
(#21228)
---
pip/pip-303.md | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 224 insertions(+)
diff --git a/pip/pip-303.md b/pip/pip-303.md
new file mode 100644
index 00000000000..53861631cf9
--- /dev/null
+++ b/pip/pip-303.md
@@ -0,0 +1,224 @@
+
+# Motivation
+
+When a topic has a large number of producers or consumers (over 1k), querying
the `pulsarAdmin.topics().getPartitionedStats()` interface is slow and the
response size is also large.
+As a result, it's essential to give users the option of querying producer and
consumer information.
+
+
+
+# Goals
+
+## In Scope
+
+Add the API for `org.apache.pulsar.client.admin.Topics`
+```java
+CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
+ String topic, boolean perPartition, GetStatsOptions getStatsOptions);
+
+CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions
getStatsOptions);
+```
+
+
+
+## Out of Scope
+
+None.
+
+
+# High Level Design
+
+Implement the `getPartitionedStatsAsync` method, and add the
`excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/partitioned-stats` API in `PersistentTopics` and
`NonPersistentTopics`.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+
+Add two fields for `org.apache.pulsar.client.admin.GetStatsOptions`
+```java
+@Data
+@Builder
+public class GetStatsOptions {
+ /**
+ * Whether to exclude publishers.
+ */
+ private final boolean excludePublishers;
+
+ /**
+ * Whether to exclude consumers.
+ */
+ private final boolean excludeConsumers;
+
+}
+```
+
+Implement the `getPartitionedStatsAsync` and `getStatsAsync` interface for
`org.apache.pulsar.client.admin.internal.TopicsImpl`
+```java
+@Override
+public CompletableFuture<PartitionedTopicStats>
getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions
getStatsOptions){
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "partitioned-stats");
+ path = path.queryParam("perPartition", perPartition)
+ .queryParam("getPreciseBacklog",
getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize",
getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog",
getStatsOptions.isGetEarliestTimeInBacklog());
+ .queryParam("excludePublishers",
getStatsOptions.isExcludePublishers())
+ .queryParam("excludeConsumers",
getStatsOptions.isExcludeConsumers());
+}
+
+@Override
+public CompletableFuture<TopicStats> getStatsAsync(String topic,
GetStatsOptions getStatsOptions){
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "stats")
+ .queryParam("getPreciseBacklog",
getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize",
getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog",
getStatsOptions.isGetEarliestTimeInBacklog());
+
.queryParam("excludePublishers",getStatsOptions.isExcludePublishers())
+
.queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers());
+}
+```
+
+Add the `excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/partitioned-stats` API
+```java
+@GET
+@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
+public void getPartitionedStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Get per partition stats")
+ @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
+ @ApiParam(value = "Whether leader broker redirected this call to this
broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "If return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean
getPreciseBacklog,
+ @ApiParam(value = "If return backlog size for each subscription,
require locking on ledger so be careful "
+ + "not to use when there's heavy traffic.")
+ @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean
subscriptionBacklogSize,
+ @ApiParam(value = "If return the earliest time in backlog")
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean
getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers")
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean
excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean
excludeConsumers)
+
+```
+
+Add the `excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/stats` API
+```java
+@GET
+@Path("{tenant}/{namespace}/{topic}/stats")
+public void getStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Whether leader broker redirected this call to this
broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "If return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean
getPreciseBacklog,
+ @ApiParam(value = "If return backlog size for each subscription,
require locking on ledger so be careful "
+ + "not to use when there's heavy traffic.")
+ @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean
subscriptionBacklogSize,
+ @ApiParam(value = "If return time of the earliest message in backlog")
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean
getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers"),
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean
excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean
excludeConsumers)
+```
+
+Add a new method for `org.apache.pulsar.broker.service.Topic`
+```java
+CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
getStatsOptions);
+
+@Data
+@Builder
+public class GetStatsOptions {
+ /**
+ * Set to true to get precise backlog, Otherwise get imprecise backlog.
+ */
+ private final boolean getPreciseBacklog;
+
+ /**
+ * Whether to get backlog size for each subscription.
+ */
+ private final boolean subscriptionBacklogSize;
+
+ /**
+ * Whether to get the earliest time in backlog.
+ */
+ private final boolean getEarliestTimeInBacklog;
+
+ /**
+ * Whether to exclude publishers.
+ */
+ private final boolean excludePublishers;
+
+ /**
+ * Whether to exclude consumers.
+ */
+ private final boolean excludeConsumers;
+
+}
+```
+
+Add the following logic in
`org.apache.pulsar.broker.service.persistent.PersistentTopic.asyncGetStats` and
`org.apache.pulsar.broker.service.persistent.PersistentSubscription.getStats`:
+
+```java
+ if (!excludePublishers){
+ stats.addPublisher(publisherStats);
+ }
+
+ if (!excludeConsumers){
+ subStats.consumers.add(consumerStats);
+ }
+```
+
+## Public-facing Changes
+
+
+### Public API
+
+### Binary protocol
+
+### Configuration
+
+### CLI
+
+### Metrics
+
+
+
+# Monitoring
+
+
+
+# Security Considerations
+
+
+# Backward & Forward Compatibility
+
+## Revert
+
+
+## Upgrade
+
+# Alternatives
+
+# General Notes
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread:
https://lists.apache.org/thread/c92043zq6lyrsd5z1hnln48mx858n7vj
+* Mailing List voting thread:
https://lists.apache.org/thread/hjw3y7h5vd0x7st6zslj3btjcd6yf1lx