poorbarcode commented on code in PR #24833: URL: https://github.com/apache/pulsar/pull/24833#discussion_r2445395653
########## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.topiclistlimit; + +import io.netty.buffer.ByteBufUtil; +import io.opentelemetry.api.metrics.DoubleGauge; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongGauge; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Summary; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl; +import org.apache.pulsar.common.semaphore.AsyncSemaphore; + +/** + * Topic list memory limiter that exposes both Prometheus metrics and OpenTelemetry metrics. + */ +@Slf4j +public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl { Review Comment: Do we need to indicate in this class name that this is a class related to metrics? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java: ########## @@ -101,6 +112,7 @@ public void accept(String topicName, NotificationType notificationType) { private final boolean enableSubscriptionPatternEvaluation; private final int maxSubscriptionPatternLength; private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> watchers; + private final Backoff retryBackoff; public TopicListService(PulsarService pulsar, ServerCnx connection, Review Comment: This function is a notification for the client, to let the client know which topic was changed; it should not be limited by the new feature. the change https://github.com/apache/pulsar/pull/24833/files#diff-5cb94e51fda5445d6fc829fda24eeccda65bf166643a7bf6957fa195516bca28L363-R392 is also related ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java: ########## @@ -165,8 +173,66 @@ protected CompletableFuture<Void> internalCreateNamespace(Policies policies) { .thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName)); } - protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies, + protected CompletableFuture<List<String>> internalGetListOfTopics(AsyncResponse response, Policies policies, CommandGetTopicsOfNamespace.Mode mode) { + // Use maxTopicListInFlightLimiter to limit inflight get topic listing responses + // to avoid OOME caused by a lot of clients using HTTP service lookups to list topics + AsyncDualMemoryLimiterImpl maxTopicListInFlightLimiter = + pulsar().getBrokerService().getMaxTopicListInFlightLimiter(); + TopicListSizeResultCache.ResultHolder listSizeHolder = pulsar().getBrokerService().getTopicListSizeResultCache() + .getTopicListSize(namespaceName.toString(), mode); + // setup the permit cancellation function + AtomicBoolean permitRequestCancelled = new AtomicBoolean(false); + BooleanSupplier isPermitRequestCancelled = permitRequestCancelled::get; + // add callback that releases permits when the response completes + AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit> initialPermitsRef = + new AtomicReference<>(); + AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit> permitsRef = new AtomicReference<>(); + response.register(new CompletionCallback() { + @Override + public void onComplete(Throwable throwable) { + if (throwable != null) { + // for failed request + // handle resetting the TopicListSizeResultCache.ResultHolder + listSizeHolder.resetIfInitializing(); + // cancel any pending permit request + permitRequestCancelled.set(true); + } + AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit initialPermit = initialPermitsRef.get(); + if (initialPermit != null) { + maxTopicListInFlightLimiter.release(initialPermit); + } + AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permits = permitsRef.get(); + if (permits != null) { + maxTopicListInFlightLimiter.release(permits); + } + } + }); + return listSizeHolder.getSizeAsync().thenCompose(initialSize -> maxTopicListInFlightLimiter.acquire(initialSize, + AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled).exceptionally(t -> { + throw new CompletionException( + new RestException(Status.TOO_MANY_REQUESTS, "Failed due to heap memory limit exceeded")); + }).thenCompose(initialPermits -> { + initialPermitsRef.set(initialPermits); + // perform the actual get list of topics operation + return doInternalGetListOfTopics(policies, mode).thenCompose(topicList -> { + long actualSize = TopicListMemoryLimiter.estimateTopicListSize(topicList); + listSizeHolder.updateSize(actualSize); + return maxTopicListInFlightLimiter.update(initialPermits, actualSize, isPermitRequestCancelled) Review Comment: The same comment as https://github.com/apache/pulsar/pull/24833#discussion_r2444987116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
