poorbarcode commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2444987116
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2610,6 +2581,93 @@ protected void
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
}
}
+ private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Semaphore lookupSemaphore)
{
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder
+ listSizeHolder =
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
+ return
getBrokerService().pulsar().getNamespaceService()
+ .getListOfUserTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topics);
+ listSizeHolder.updateSize(actualSize);
+
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
Review Comment:
Instead of updating and acquiring additional permits if the actual size is
larger than estimated, we'd better improve as follows:
- The current request does not need to acquire permits again; only updating
the permits that were borrowed out is enough, even though the permits were
over-acquired
- Updates the permits that were borrowed out, to limit the following permits
acquisition;
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2610,6 +2581,93 @@ protected void
handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
}
}
+ private void internalHandleGetTopicsOfNamespace(String namespace,
NamespaceName namespaceName, long requestId,
+
CommandGetTopicsOfNamespace.Mode mode,
+ Optional<String>
topicsPattern, Optional<String> topicsHash,
+ Semaphore lookupSemaphore)
{
+ BooleanSupplier isPermitRequestCancelled = () ->
!ctx().channel().isActive();
+ TopicListSizeResultCache.ResultHolder
+ listSizeHolder =
service.getTopicListSizeResultCache().getTopicListSize(namespaceName.toString(),
mode);
+ listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
+ maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled, initialPermits -> {
Review Comment:
Just add a tip: if the data does not exist in the cache, it may query ZK,
which will also use direct memory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]