lhotari commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2466068257
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -165,8 +173,66 @@ protected CompletableFuture<Void>
internalCreateNamespace(Policies policies) {
.thenAccept(__ -> log.info("[{}] Created namespace {}",
clientAppId(), namespaceName));
}
- protected CompletableFuture<List<String>> internalGetListOfTopics(Policies
policies,
+ protected CompletableFuture<List<String>>
internalGetListOfTopics(AsyncResponse response, Policies policies,
CommandGetTopicsOfNamespace.Mode mode) {
+ // Use maxTopicListInFlightLimiter to limit inflight get topic listing
responses
+ // to avoid OOME caused by a lot of clients using HTTP service lookups
to list topics
+ AsyncDualMemoryLimiterImpl maxTopicListInFlightLimiter =
+ pulsar().getBrokerService().getMaxTopicListInFlightLimiter();
+ TopicListSizeResultCache.ResultHolder listSizeHolder =
pulsar().getBrokerService().getTopicListSizeResultCache()
+ .getTopicListSize(namespaceName.toString(), mode);
+ // setup the permit cancellation function
+ AtomicBoolean permitRequestCancelled = new AtomicBoolean(false);
+ BooleanSupplier isPermitRequestCancelled = permitRequestCancelled::get;
+ // add callback that releases permits when the response completes
+ AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit>
initialPermitsRef =
+ new AtomicReference<>();
+ AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit>
permitsRef = new AtomicReference<>();
+ response.register(new CompletionCallback() {
+ @Override
+ public void onComplete(Throwable throwable) {
+ if (throwable != null) {
+ // for failed request
+ // handle resetting the
TopicListSizeResultCache.ResultHolder
+ listSizeHolder.resetIfInitializing();
+ // cancel any pending permit request
+ permitRequestCancelled.set(true);
+ }
+ AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit
initialPermit = initialPermitsRef.get();
+ if (initialPermit != null) {
+ maxTopicListInFlightLimiter.release(initialPermit);
+ }
+ AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permits =
permitsRef.get();
+ if (permits != null) {
+ maxTopicListInFlightLimiter.release(permits);
+ }
+ }
+ });
+ return listSizeHolder.getSizeAsync().thenCompose(initialSize ->
maxTopicListInFlightLimiter.acquire(initialSize,
+ AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY,
isPermitRequestCancelled).exceptionally(t -> {
+ throw new CompletionException(
+ new RestException(Status.TOO_MANY_REQUESTS, "Failed due to
heap memory limit exceeded"));
+ }).thenCompose(initialPermits -> {
+ initialPermitsRef.set(initialPermits);
+ // perform the actual get list of topics operation
+ return doInternalGetListOfTopics(policies,
mode).thenCompose(topicList -> {
+ long actualSize =
TopicListMemoryLimiter.estimateTopicListSize(topicList);
+ listSizeHolder.updateSize(actualSize);
+ return maxTopicListInFlightLimiter.update(initialPermits,
actualSize, isPermitRequestCancelled)
Review Comment:
similar response.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]