nodece commented on code in PR #21228:
URL: https://github.com/apache/pulsar/pull/21228#discussion_r1395722267
##########
pip/pip-303.md:
##########
@@ -0,0 +1,202 @@
+
+# Motivation
+
+When a topic has a large number of producers or consumers (over 1k), querying
the `pulsarAdmin.topics().getPartitionedStats()` interface is slow and the
response size is also large.
+As a result, it's essential to give users the option of querying producer and
consumer information.
+
+
+
+# Goals
+
+## In Scope
+
+Add the API for `org.apache.pulsar.client.admin.Topics`
+```java
+CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
+ String topic, boolean perPartition, GetStatsOptions getStatsOptions);
+
+CompletableFuture<TopicStats> getStatsAsync(String topic, GetStatsOptions
getStatsOptions);
+```
+
+
+
+## Out of Scope
+
+None.
+
+
+# High Level Design
+
+Implement the `getPartitionedStatsAsync` method, and add the
`excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/partitioned-stats` API in `PersistentTopics` and
`NonPersistentTopics`.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+
+Add two fields for `org.apache.pulsar.client.admin.GetStatsOptions`
+```java
+@Data
+@Builder
+public class GetStatsOptions {
+ /**
+ * Whether to exclude publishers.
+ */
+ private final boolean excludePublishers;
+
+ /**
+ * Whether to exclude consumers.
+ */
+ private final boolean excludeConsumers;
+
+}
+```
+
+Implement the `getPartitionedStatsAsync` and `getStatsAsync` interface for
`org.apache.pulsar.client.admin.internal.TopicsImpl`
+```java
+@Override
+public CompletableFuture<PartitionedTopicStats>
getPartitionedStatsAsync(String topic, boolean perPartition, GetStatsOptions
getStatsOptions){
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "partitioned-stats");
+ path = path.queryParam("perPartition", perPartition)
+ .queryParam("getPreciseBacklog",
getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize",
getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog",
getStatsOptions.isGetEarliestTimeInBacklog());
+ .queryParam("excludePublishers",
getStatsOptions.isExcludePublishers())
+ .queryParam("excludeConsumers",
getStatsOptions.isExcludeConsumers());
+}
+
+@Override
+public CompletableFuture<TopicStats> getStatsAsync(String topic,
GetStatsOptions getStatsOptions){
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "stats")
+ .queryParam("getPreciseBacklog",
getStatsOptions.isGetPreciseBacklog())
+ .queryParam("subscriptionBacklogSize",
getStatsOptions.isSubscriptionBacklogSize())
+ .queryParam("getEarliestTimeInBacklog",
getStatsOptions.isGetEarliestTimeInBacklog());
+
.queryParam("excludePublishers",getStatsOptions.isExcludePublishers())
+
.queryParam("excludeConsumers",getStatsOptions.isExcludeConsumers());
+}
+```
+
+Add the `excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/partitioned-stats` API
+```java
+@GET
+@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
+public void getPartitionedStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Get per partition stats")
+ @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
+ @ApiParam(value = "Whether leader broker redirected this call to this
broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "If return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean
getPreciseBacklog,
+ @ApiParam(value = "If return backlog size for each subscription,
require locking on ledger so be careful "
+ + "not to use when there's heavy traffic.")
+ @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean
subscriptionBacklogSize,
+ @ApiParam(value = "If return the earliest time in backlog")
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean
getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers")
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean
excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean
excludeConsumers)
+
+```
+
+Add the `excludePublishers` and `excludeConsumers` parameters to
`{tenant}/{namespace}/{topic}/stats` API
+```java
+@GET
+@Path("{tenant}/{namespace}/{topic}/stats")
+public void getStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @ApiParam(value = "Specify the tenant", required = true)
+ @PathParam("tenant") String tenant,
+ @ApiParam(value = "Specify the namespace", required = true)
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Specify topic name", required = true)
+ @PathParam("topic") @Encoded String encodedTopic,
+ @ApiParam(value = "Whether leader broker redirected this call to this
broker. For internal use.")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
+ @ApiParam(value = "If return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean
getPreciseBacklog,
+ @ApiParam(value = "If return backlog size for each subscription,
require locking on ledger so be careful "
+ + "not to use when there's heavy traffic.")
+ @QueryParam("subscriptionBacklogSize") @DefaultValue("true") boolean
subscriptionBacklogSize,
+ @ApiParam(value = "If return time of the earliest message in backlog")
+ @QueryParam("getEarliestTimeInBacklog") @DefaultValue("false") boolean
getEarliestTimeInBacklog,
+ @ApiParam(value = "If exclude the publishers"),
+ @QueryParam("excludePublishers") @DefaultValue("false") boolean
excludePublishers,
+ @ApiParam(value = "If exclude the consumers")
+ @QueryParam("excludeConsumers") @DefaultValue("false") boolean
excludeConsumers)
+```
+
+
+Add parameters for `org.apache.pulsar.broker.service.Topic`
+```java
+TopicStatsImpl getStats(boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
+ boolean getEarliestTimeInBacklog, boolean excludePublishers, boolean
excludeConsumers);
+
+CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean
getPreciseBacklog,
Review Comment:
IMO, we only need to add an async method to avoid blocking the thread.
For the method arguments, we can also use the above design, moving these
arguments to a class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]