poorbarcode commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2445395653


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.topiclistlimit;
+
+import io.netty.buffer.ByteBufUtil;
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
+
+/**
+ * Topic list memory limiter that exposes both Prometheus metrics and 
OpenTelemetry metrics.
+ */
+@Slf4j
+public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {

Review Comment:
   Do we need to indicate in this class name that this is a class related to 
metrics?
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -101,6 +112,7 @@ public void accept(String topicName, NotificationType 
notificationType) {
     private final boolean enableSubscriptionPatternEvaluation;
     private final int maxSubscriptionPatternLength;
     private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> 
watchers;
+    private final Backoff retryBackoff;
 
 
     public TopicListService(PulsarService pulsar, ServerCnx connection,

Review Comment:
   This function is a notification for the client, to let the client know which 
topic was changed; it should not be limited by the new feature. the change 
https://github.com/apache/pulsar/pull/24833/files#diff-5cb94e51fda5445d6fc829fda24eeccda65bf166643a7bf6957fa195516bca28L363-R392
 is also related



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -165,8 +173,66 @@ protected CompletableFuture<Void> 
internalCreateNamespace(Policies policies) {
                 .thenAccept(__ -> log.info("[{}] Created namespace {}", 
clientAppId(), namespaceName));
     }
 
-    protected CompletableFuture<List<String>> internalGetListOfTopics(Policies 
policies,
+    protected CompletableFuture<List<String>> 
internalGetListOfTopics(AsyncResponse response, Policies policies,
                                                                       
CommandGetTopicsOfNamespace.Mode mode) {
+        // Use maxTopicListInFlightLimiter to limit inflight get topic listing 
responses
+        // to avoid OOME caused by a lot of clients using HTTP service lookups 
to list topics
+        AsyncDualMemoryLimiterImpl maxTopicListInFlightLimiter =
+                pulsar().getBrokerService().getMaxTopicListInFlightLimiter();
+        TopicListSizeResultCache.ResultHolder listSizeHolder = 
pulsar().getBrokerService().getTopicListSizeResultCache()
+                .getTopicListSize(namespaceName.toString(), mode);
+        // setup the permit cancellation function
+        AtomicBoolean permitRequestCancelled = new AtomicBoolean(false);
+        BooleanSupplier isPermitRequestCancelled = permitRequestCancelled::get;
+        // add callback that releases permits when the response completes
+        AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit> 
initialPermitsRef =
+                new AtomicReference<>();
+        AtomicReference<AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit> 
permitsRef = new AtomicReference<>();
+        response.register(new CompletionCallback() {
+            @Override
+            public void onComplete(Throwable throwable) {
+                if (throwable != null) {
+                    // for failed request
+                    // handle resetting the 
TopicListSizeResultCache.ResultHolder
+                    listSizeHolder.resetIfInitializing();
+                    // cancel any pending permit request
+                    permitRequestCancelled.set(true);
+                }
+                AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit 
initialPermit = initialPermitsRef.get();
+                if (initialPermit != null) {
+                    maxTopicListInFlightLimiter.release(initialPermit);
+                }
+                AsyncDualMemoryLimiter.AsyncDualMemoryLimiterPermit permits = 
permitsRef.get();
+                if (permits != null) {
+                    maxTopicListInFlightLimiter.release(permits);
+                }
+            }
+        });
+        return listSizeHolder.getSizeAsync().thenCompose(initialSize -> 
maxTopicListInFlightLimiter.acquire(initialSize,
+                AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled).exceptionally(t -> {
+            throw new CompletionException(
+                    new RestException(Status.TOO_MANY_REQUESTS, "Failed due to 
heap memory limit exceeded"));
+        }).thenCompose(initialPermits -> {
+            initialPermitsRef.set(initialPermits);
+            // perform the actual get list of topics operation
+            return doInternalGetListOfTopics(policies, 
mode).thenCompose(topicList -> {
+                long actualSize = 
TopicListMemoryLimiter.estimateTopicListSize(topicList);
+                listSizeHolder.updateSize(actualSize);
+                return maxTopicListInFlightLimiter.update(initialPermits, 
actualSize, isPermitRequestCancelled)

Review Comment:
   The same comment as 
https://github.com/apache/pulsar/pull/24833#discussion_r2444987116



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to