Denovo1998 commented on code in PR #24833:
URL: https://github.com/apache/pulsar/pull/24833#discussion_r2429175692


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.topiclistlimit;
+
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
+
+/**
+ * Topic list memory limiter that exposes both Prometheus metrics and 
OpenTelemetry metrics.
+ */
+@Slf4j
+public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {
+    private final CollectorRegistry collectorRegistry;
+    private final Gauge heapMemoryUsedBytes;
+    private final Gauge heapMemoryLimitBytes;
+    private final Gauge directMemoryUsedBytes;
+    private final Gauge directMemoryLimitBytes;
+    private final Gauge heapQueueSize;
+    private final Gauge heapQueueMaxSize;
+    private final Gauge directQueueSize;
+    private final Gauge directQueueMaxSize;
+    private final Summary heapWaitTimeMs;
+    private final Summary directWaitTimeMs;
+    private final Counter heapTimeoutTotal;
+    private final Counter directTimeoutTotal;
+    private final ObservableDoubleGauge otelHeapMemoryUsedGauge;
+    private final DoubleGauge otelHeapMemoryLimitGauge;
+    private final ObservableDoubleGauge otelDirectMemoryUsedGauge;
+    private final DoubleGauge otelDirectMemoryLimitGauge;
+    private final ObservableLongUpDownCounter otelHeapQueueSize;
+    private final ObservableLongUpDownCounter otelDirectQueueSize;
+    private final DoubleHistogram otelHeapWaitTime;
+    private final DoubleHistogram otelDirectWaitTime;
+    private final LongCounter otelHeapTimeoutTotal;
+    private final LongCounter otelDirectTimeoutTotal;
+
+    public TopicListMemoryLimiter(CollectorRegistry collectorRegistry, String 
prometheusPrefix,
+                                  Meter openTelemetryMeter,
+                                  long maxHeapMemory, int maxHeapQueueSize,
+                                  long heapTimeoutMillis, long 
maxDirectMemory, int maxDirectQueueSize,
+                                  long directTimeoutMillis) {
+        super(maxHeapMemory, maxHeapQueueSize, heapTimeoutMillis, 
maxDirectMemory, maxDirectQueueSize,
+                directTimeoutMillis);
+        this.collectorRegistry = collectorRegistry;
+
+        AsyncSemaphore heapMemoryLimiter = getLimiter(LimitType.HEAP_MEMORY);
+        AsyncSemaphore directMemoryLimiter = 
getLimiter(LimitType.DIRECT_MEMORY);
+
+        this.heapMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_used_bytes",
+                        "Current heap memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelHeapMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.used")
+                .setUnit("By")
+                .setDescription("Current heap memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(heapMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.heapMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_limit_bytes",
+                        "Configured heap memory limit")
+                .create());
+        this.heapMemoryLimitBytes.set(maxHeapMemory);
+        this.otelHeapMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured heap memory limit")
+                .build();
+        this.otelHeapMemoryLimitGauge.set(maxHeapMemory);
+
+        this.directMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_used_bytes",
+                        "Current direct memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelDirectMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.used")
+                .setUnit("By")
+                .setDescription("Current direct memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(directMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.directMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_limit_bytes",
+                        "Configured direct memory limit")
+                .create());
+        this.directMemoryLimitBytes.set(maxDirectMemory);
+        this.otelDirectMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured direct memory limit")
+                .build();
+        this.otelDirectMemoryLimitGauge.set(maxDirectMemory);
+
+        this.heapQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_size",
+                        "Current heap memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelHeapQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.heap.queue.size")
+                .setDescription("Current heap memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(heapMemoryLimiter.getQueueSize());
+                });
+
+        this.heapQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_max_size",
+                        "Maximum heap memory limiter queue size")
+                .create());
+        this.heapQueueMaxSize.set(maxHeapQueueSize);
+        LongGauge otelHeapQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.heap.queue.max.size")
+                .setDescription("Maximum heap memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelHeapQueueMaxSize.set(maxHeapQueueSize);
+
+        this.directQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_size",
+                        "Current direct memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelDirectQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.direct.queue.size")
+                .setDescription("Current direct memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(directMemoryLimiter.getQueueSize());
+                });
+
+        this.directQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_max_size",
+                        "Maximum direct memory limiter queue size")
+                .create());
+        this.directQueueMaxSize.set(maxDirectQueueSize);
+        LongGauge otelDirectQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.direct.queue.max.size")
+                .setDescription("Maximum direct memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelDirectQueueMaxSize.set(maxDirectQueueSize);
+
+        this.heapWaitTimeMs = register(Summary.build(prometheusPrefix + 
"topic_list_heap_wait_time_ms",
+                        "Wait time for heap memory permits")
+                .quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create());
+        this.otelHeapWaitTime = 
openTelemetryMeter.histogramBuilder("topic.list.heap.wait.time.ms")
+                .setUnit("s")
+                .setDescription("Wait time for heap memory permits")
+                .build();
+
+        this.directWaitTimeMs = register(Summary.build(prometheusPrefix + 
"topic_list_direct_wait_time_ms",
+                        "Wait time for direct memory permits")
+                .quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create());
+        this.otelDirectWaitTime = 
openTelemetryMeter.histogramBuilder("topic.list.direct.wait.time.ms")
+                .setUnit("s")
+                .setDescription("Wait time for direct memory permits")
+                .build();
+
+        this.heapTimeoutTotal = register(Counter.build(prometheusPrefix + 
"topic_list_heap_timeout_total",
+                        "Total heap memory permit timeouts")
+                .create());
+        this.otelHeapTimeoutTotal = 
openTelemetryMeter.counterBuilder("topic.list.heap.timeout.total")
+                .setDescription("Total heap memory permit timeouts")
+                .setUnit("1")
+                .build();
+
+        this.directTimeoutTotal = register(Counter.build(prometheusPrefix + 
"topic_list_direct_timeout_total",
+                        "Total direct memory permit timeouts")
+                .create());
+        this.otelDirectTimeoutTotal = 
openTelemetryMeter.counterBuilder("topic.list.direct.timeout.total")
+                .setDescription("Total direct memory permit timeouts")
+                .setUnit("1")
+                .build();
+    }
+
+    private <T extends Collector> T register(T collector) {
+        try {
+            collectorRegistry.register(collector);
+        } catch (Exception e) {
+            // ignore exception when registering a collector that is already 
registered
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to register Prometheus collector {}", 
collector, e);
+            }
+        }
+        return collector;
+    }
+
+    private void unregister(Collector collector) {
+        try {
+            collectorRegistry.unregister(collector);
+        } catch (Exception e) {
+            // ignore exception when unregistering a collector that is not 
registered
+            if (log.isDebugEnabled()) {
+                log.debug("Failed to unregister Prometheus collector {}", 
collector, e);
+            }
+        }
+    }
+
+
+    @Override
+    protected void recordHeapWaitTime(long waitTimeNanos) {
+        if (waitTimeNanos == Long.MAX_VALUE) {
+            heapTimeoutTotal.inc();
+            otelHeapTimeoutTotal.add(1);
+        } else {
+            
heapWaitTimeMs.observe(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos));
+            otelHeapWaitTime.record(waitTimeNanos / 1_000_000_000.0d);
+        }
+    }
+
+    @Override
+    protected void recordDirectWaitTime(long waitTimeNanos) {
+        if (waitTimeNanos == Long.MAX_VALUE) {
+            directTimeoutTotal.inc();
+            otelDirectTimeoutTotal.add(1);
+        } else {
+            
directWaitTimeMs.observe(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos));
+            otelDirectWaitTime.record(waitTimeNanos / 1_000_000_000.0d);
+        }
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        unregister(heapMemoryUsedBytes);
+        unregister(heapMemoryLimitBytes);
+        unregister(directMemoryUsedBytes);
+        unregister(directMemoryLimitBytes);
+        unregister(heapQueueSize);
+        unregister(heapQueueMaxSize);
+        unregister(directQueueSize);
+        unregister(directQueueMaxSize);
+        unregister(heapWaitTimeMs);
+        unregister(directWaitTimeMs);
+        unregister(heapTimeoutTotal);
+        unregister(directTimeoutTotal);
+        otelHeapMemoryUsedGauge.close();
+        otelDirectMemoryUsedGauge.close();
+        otelHeapQueueSize.close();

Review Comment:
   Close all OpenTelemetry instruments to prevent resource leaks.
   ```java
   otelHeapMemoryLimitGauge.close();
   otelDirectMemoryLimitGauge.close();
   otelHeapQueueMaxSize.close();
   otelDirectQueueMaxSize.close();
   otelHeapWaitTime.close();
   otelDirectWaitTime.close();
   otelHeapTimeoutTotal.close();
   otelDirectTimeoutTotal.close();
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.topiclistlimit;
+
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
+
+/**
+ * Topic list memory limiter that exposes both Prometheus metrics and 
OpenTelemetry metrics.
+ */
+@Slf4j
+public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {
+    private final CollectorRegistry collectorRegistry;
+    private final Gauge heapMemoryUsedBytes;
+    private final Gauge heapMemoryLimitBytes;
+    private final Gauge directMemoryUsedBytes;
+    private final Gauge directMemoryLimitBytes;
+    private final Gauge heapQueueSize;
+    private final Gauge heapQueueMaxSize;
+    private final Gauge directQueueSize;
+    private final Gauge directQueueMaxSize;
+    private final Summary heapWaitTimeMs;
+    private final Summary directWaitTimeMs;
+    private final Counter heapTimeoutTotal;
+    private final Counter directTimeoutTotal;
+    private final ObservableDoubleGauge otelHeapMemoryUsedGauge;
+    private final DoubleGauge otelHeapMemoryLimitGauge;
+    private final ObservableDoubleGauge otelDirectMemoryUsedGauge;
+    private final DoubleGauge otelDirectMemoryLimitGauge;
+    private final ObservableLongUpDownCounter otelHeapQueueSize;
+    private final ObservableLongUpDownCounter otelDirectQueueSize;
+    private final DoubleHistogram otelHeapWaitTime;
+    private final DoubleHistogram otelDirectWaitTime;
+    private final LongCounter otelHeapTimeoutTotal;
+    private final LongCounter otelDirectTimeoutTotal;
+
+    public TopicListMemoryLimiter(CollectorRegistry collectorRegistry, String 
prometheusPrefix,
+                                  Meter openTelemetryMeter,
+                                  long maxHeapMemory, int maxHeapQueueSize,
+                                  long heapTimeoutMillis, long 
maxDirectMemory, int maxDirectQueueSize,
+                                  long directTimeoutMillis) {
+        super(maxHeapMemory, maxHeapQueueSize, heapTimeoutMillis, 
maxDirectMemory, maxDirectQueueSize,
+                directTimeoutMillis);
+        this.collectorRegistry = collectorRegistry;
+
+        AsyncSemaphore heapMemoryLimiter = getLimiter(LimitType.HEAP_MEMORY);
+        AsyncSemaphore directMemoryLimiter = 
getLimiter(LimitType.DIRECT_MEMORY);
+
+        this.heapMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_used_bytes",
+                        "Current heap memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelHeapMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.used")
+                .setUnit("By")
+                .setDescription("Current heap memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(heapMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.heapMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_limit_bytes",
+                        "Configured heap memory limit")
+                .create());
+        this.heapMemoryLimitBytes.set(maxHeapMemory);
+        this.otelHeapMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured heap memory limit")
+                .build();
+        this.otelHeapMemoryLimitGauge.set(maxHeapMemory);
+
+        this.directMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_used_bytes",
+                        "Current direct memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelDirectMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.used")
+                .setUnit("By")
+                .setDescription("Current direct memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(directMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.directMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_limit_bytes",
+                        "Configured direct memory limit")
+                .create());
+        this.directMemoryLimitBytes.set(maxDirectMemory);
+        this.otelDirectMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured direct memory limit")
+                .build();
+        this.otelDirectMemoryLimitGauge.set(maxDirectMemory);
+
+        this.heapQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_size",
+                        "Current heap memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelHeapQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.heap.queue.size")
+                .setDescription("Current heap memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(heapMemoryLimiter.getQueueSize());
+                });
+
+        this.heapQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_max_size",
+                        "Maximum heap memory limiter queue size")
+                .create());
+        this.heapQueueMaxSize.set(maxHeapQueueSize);
+        LongGauge otelHeapQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.heap.queue.max.size")
+                .setDescription("Maximum heap memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelHeapQueueMaxSize.set(maxHeapQueueSize);
+
+        this.directQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_size",
+                        "Current direct memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelDirectQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.direct.queue.size")
+                .setDescription("Current direct memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(directMemoryLimiter.getQueueSize());
+                });
+
+        this.directQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_max_size",
+                        "Maximum direct memory limiter queue size")
+                .create());
+        this.directQueueMaxSize.set(maxDirectQueueSize);
+        LongGauge otelDirectQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.direct.queue.max.size")
+                .setDescription("Maximum direct memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelDirectQueueMaxSize.set(maxDirectQueueSize);
+
+        this.heapWaitTimeMs = register(Summary.build(prometheusPrefix + 
"topic_list_heap_wait_time_ms",
+                        "Wait time for heap memory permits")
+                .quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create());
+        this.otelHeapWaitTime = 
openTelemetryMeter.histogramBuilder("topic.list.heap.wait.time.ms")
+                .setUnit("s")
+                .setDescription("Wait time for heap memory permits")
+                .build();
+
+        this.directWaitTimeMs = register(Summary.build(prometheusPrefix + 
"topic_list_direct_wait_time_ms",
+                        "Wait time for direct memory permits")
+                .quantile(0.50, 0.01)
+                .quantile(0.95, 0.01)
+                .quantile(0.99, 0.01)
+                .quantile(1, 0.01)
+                .create());
+        this.otelDirectWaitTime = 
openTelemetryMeter.histogramBuilder("topic.list.direct.wait.time.ms")
+                .setUnit("s")

Review Comment:
   The Prometheus metric name uses _wait_time_ms (milliseconds) but the 
OpenTelemetry histogram unit is set to "s" (seconds).
   While the actual recording in recordDirectWaitTime correctly converts to 
seconds, the metric name .wait.time.ms suggests milliseconds, which is 
confusing.



##########
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphore.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.semaphore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+/**
+ * An abstraction for a generic asynchronous semaphore.
+ */
+public interface AsyncSemaphore {
+    /**
+     * Acquire permits from the semaphore.
+     * Returned future completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireTimeoutException`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureMultipleConsumersTest.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.stats.JvmMetrics;
+import org.apache.pulsar.common.util.DirectMemoryUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerBackPressureMultipleConsumersTest extends 
MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        isTcpLookup = true;
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void testGetTopicsWithLargeAmountOfConcurrentClientConnections()
+            throws PulsarAdminException, InterruptedException, IOException {
+        // number of requests to send to the broker
+        final int requests = 500;
+        // use multiple clients so that each client has a separate connection 
to the broker
+        final int numberOfClients = 200;
+        // create a long topic name to consume more memory per topic
+        final String topicName = StringUtils.repeat('a', 512) + 
UUID.randomUUID();
+        // number of topics to create
+        final int topicCount = 8192;
+        // maximum number of requests in flight at any given time
+        final int maxRequestsInFlight = 200;
+
+        // create a single topic with multiple partitions
+        admin.topics().createPartitionedTopic(topicName, topicCount);
+
+        // reduce available direct memory to reproduce issues with less 
concurrency
+        long directMemoryRequired = getDirectMemoryRequiredMB() * 1024 * 1024;
+        List<ByteBuf> buffers = allocateDirectMemory(directMemoryRequired);
+        @Cleanup
+        Closeable releaseBuffers = () -> {
+            for (ByteBuf byteBuf : buffers) {
+                byteBuf.release();
+            }
+        };
+
+        @Cleanup("shutdownNow")
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(Runtime.getRuntime()
+                .availableProcessors());
+
+        @Cleanup
+        PulsarClientSharedResources sharedResources =
+                PulsarClientSharedResources.builder().build();
+        List<PulsarClientImpl> clients = new ArrayList<>(requests);
+        @Cleanup
+        Closeable closeClients = () -> {
+            for (PulsarClient client : clients) {
+                try {
+                    client.close();
+                } catch (PulsarClientException e) {
+                    log.error("Failed to close client {}", client, e);
+                }
+            }
+        };
+        for (int i = 0; i < numberOfClients; i++) {
+            PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+                    .serviceUrl(getClientServiceUrl())
+                    .sharedResources(sharedResources)
+                    .build();
+            clients.add(client);
+        }
+
+        final AtomicInteger success = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(requests);
+        final Semaphore semaphore = new Semaphore(maxRequestsInFlight);
+        for (int i = 0; i < requests; i++) {
+            PulsarClientImpl pulsarClientImpl = clients.get(i % 
numberOfClients);
+            executorService.execute(() -> {
+                semaphore.acquireUninterruptibly();
+                try {
+                    pulsarClientImpl.getLookup()
+                            
.getTopicsUnderNamespace(NamespaceName.get("public", "default"),
+                                    
CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", "")
+                            .whenComplete((result, ex) -> {
+                                semaphore.release();
+                                if (ex == null) {
+                                    success.incrementAndGet();
+                                } else {
+                                    log.error("Failed to get topic list.", ex);
+                                }
+                                log.info("latch-count: {}, succeed: {}, 
available direct mem: {} MB", latch.getCount(),
+                                        success.get(),
+                                        
(DirectMemoryUtils.jvmMaxDirectMemory() - JvmMetrics.getJvmDirectMemoryUsed())
+                                                / (1024 * 1024));
+                                latch.countDown();
+                            });
+                } catch (Exception e) {
+                    semaphore.release();

Review Comment:
   Should add this?
   ```java
   latch.countDown();
   log.error("Failed to execute getTopicsUnderNamespace request.", e);
   ```



##########
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphore.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.semaphore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+/**
+ * An abstraction for a generic asynchronous semaphore.
+ */
+public interface AsyncSemaphore {
+    /**
+     * Acquire permits from the semaphore.
+     * Returned future completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     *
+     * @param permits     number of permits to acquire
+     * @param isCancelled supplier that returns true if acquisition should be 
cancelled
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled);
+
+    /**
+     * Acquire or release permits for previously acquired permits by updating 
the permits.
+     * Returns a future that completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireQueueFullException`



##########
pip/pip-442.md:
##########
@@ -132,24 +134,48 @@ public interface AsyncSemaphore {
      * Returned future completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
      * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     *
+     * @param permits     number of permits to acquire
+     * @param isCancelled supplier that returns true if acquisition should be 
cancelled
      * @return CompletableFuture that completes with permit when available
      */
-    CompletableFuture<AsyncSemaphorePermitResult> acquire(long permits);
+    CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled);
 
     /**
      * Acquire or release permits for previously acquired permits by updating 
the permits.
      * Returns a future that completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
      * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireQueueFullException`



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.topiclistlimit;
+
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
+
+/**
+ * Topic list memory limiter that exposes both Prometheus metrics and 
OpenTelemetry metrics.
+ */
+@Slf4j
+public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {
+    private final CollectorRegistry collectorRegistry;
+    private final Gauge heapMemoryUsedBytes;
+    private final Gauge heapMemoryLimitBytes;
+    private final Gauge directMemoryUsedBytes;
+    private final Gauge directMemoryLimitBytes;
+    private final Gauge heapQueueSize;
+    private final Gauge heapQueueMaxSize;
+    private final Gauge directQueueSize;
+    private final Gauge directQueueMaxSize;
+    private final Summary heapWaitTimeMs;
+    private final Summary directWaitTimeMs;
+    private final Counter heapTimeoutTotal;
+    private final Counter directTimeoutTotal;
+    private final ObservableDoubleGauge otelHeapMemoryUsedGauge;
+    private final DoubleGauge otelHeapMemoryLimitGauge;
+    private final ObservableDoubleGauge otelDirectMemoryUsedGauge;
+    private final DoubleGauge otelDirectMemoryLimitGauge;
+    private final ObservableLongUpDownCounter otelHeapQueueSize;
+    private final ObservableLongUpDownCounter otelDirectQueueSize;
+    private final DoubleHistogram otelHeapWaitTime;
+    private final DoubleHistogram otelDirectWaitTime;
+    private final LongCounter otelHeapTimeoutTotal;
+    private final LongCounter otelDirectTimeoutTotal;
+
+    public TopicListMemoryLimiter(CollectorRegistry collectorRegistry, String 
prometheusPrefix,
+                                  Meter openTelemetryMeter,
+                                  long maxHeapMemory, int maxHeapQueueSize,
+                                  long heapTimeoutMillis, long 
maxDirectMemory, int maxDirectQueueSize,
+                                  long directTimeoutMillis) {
+        super(maxHeapMemory, maxHeapQueueSize, heapTimeoutMillis, 
maxDirectMemory, maxDirectQueueSize,
+                directTimeoutMillis);
+        this.collectorRegistry = collectorRegistry;
+
+        AsyncSemaphore heapMemoryLimiter = getLimiter(LimitType.HEAP_MEMORY);
+        AsyncSemaphore directMemoryLimiter = 
getLimiter(LimitType.DIRECT_MEMORY);
+
+        this.heapMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_used_bytes",
+                        "Current heap memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelHeapMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.used")
+                .setUnit("By")
+                .setDescription("Current heap memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(heapMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.heapMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_limit_bytes",
+                        "Configured heap memory limit")
+                .create());
+        this.heapMemoryLimitBytes.set(maxHeapMemory);
+        this.otelHeapMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured heap memory limit")
+                .build();
+        this.otelHeapMemoryLimitGauge.set(maxHeapMemory);
+
+        this.directMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_used_bytes",
+                        "Current direct memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelDirectMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.used")
+                .setUnit("By")
+                .setDescription("Current direct memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(directMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.directMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_limit_bytes",
+                        "Configured direct memory limit")
+                .create());
+        this.directMemoryLimitBytes.set(maxDirectMemory);
+        this.otelDirectMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured direct memory limit")
+                .build();
+        this.otelDirectMemoryLimitGauge.set(maxDirectMemory);
+
+        this.heapQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_size",
+                        "Current heap memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelHeapQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.heap.queue.size")
+                .setDescription("Current heap memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(heapMemoryLimiter.getQueueSize());
+                });
+
+        this.heapQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_max_size",
+                        "Maximum heap memory limiter queue size")
+                .create());
+        this.heapQueueMaxSize.set(maxHeapQueueSize);
+        LongGauge otelHeapQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.heap.queue.max.size")
+                .setDescription("Maximum heap memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelHeapQueueMaxSize.set(maxHeapQueueSize);

Review Comment:
   The `otelHeapQueueMaxSize` gauge is created as a local variable and cannot 
be closed in the `close()` method, potentially causing a resource leak.
   
   



##########
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.semaphore;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongConsumer;
+import org.apache.pulsar.common.util.Runnables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of AsyncSemaphore with timeout and queue size limits.
+ */
+public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable {
+    private static final Logger log = 
LoggerFactory.getLogger(AsyncSemaphoreImpl.class);
+
+    private final AtomicLong availablePermits;
+    private final ConcurrentLinkedQueue<PendingRequest> queue = new 
ConcurrentLinkedQueue<>();
+    private final long maxPermits;
+    private final int maxQueueSize;
+    private final long timeoutMillis;
+    private final ScheduledExecutorService executor;
+    private final boolean shutdownExecutor;
+    private final LongConsumer queueLatencyRecorder;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Runnable processQueueRunnable = 
Runnables.catchingAndLoggingThrowables(this::internalProcessQueue);
+
+    public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long 
timeoutMillis) {
+        this(maxPermits, maxQueueSize, timeoutMillis, createExecutor(), true, 
null);
+    }
+
+    public AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long 
timeoutMillis,
+                              ScheduledExecutorService executor, LongConsumer 
queueLatencyRecorder) {
+        this(maxPermits, maxQueueSize, timeoutMillis, executor, false, 
queueLatencyRecorder);
+    }
+
+    AsyncSemaphoreImpl(long maxPermits, int maxQueueSize, long timeoutMillis, 
ScheduledExecutorService executor,
+                       boolean shutdownExecutor, LongConsumer 
queueLatencyRecorder) {
+        this.availablePermits = new AtomicLong(maxPermits);
+        this.maxPermits = maxPermits;
+        this.maxQueueSize = maxQueueSize;
+        this.timeoutMillis = timeoutMillis;
+        this.executor = executor;
+        this.shutdownExecutor = shutdownExecutor;
+        this.queueLatencyRecorder = queueLatencyRecorder;
+    }
+
+    private static ScheduledExecutorService createExecutor() {
+        return Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("async-semaphore-executor"));
+    }
+
+    @Override
+    public CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled) {
+        return internalAcquire(permits, permits, isCancelled);
+    }
+
+    private CompletableFuture<AsyncSemaphorePermit> internalAcquire(long 
permits, long acquirePermits,
+                                                                    
BooleanSupplier isCancelled) {
+        if (permits < 0) {
+            throw new IllegalArgumentException("Invalid permits value: " + 
permits);
+        }
+
+        CompletableFuture<AsyncSemaphorePermit> future = new 
CompletableFuture<>();
+
+        if (closed.get()) {
+            future.completeExceptionally(new 
PermitAcquireAlreadyClosedException("Semaphore is closed"));
+            return future;
+        }
+
+        if (queue.size() >= maxQueueSize) {

Review Comment:
   The size() method on `ConcurrentLinkedQueue` iterates through all elements, 
making it an `O(n)` operation.
   Maintaining a separate AtomicInteger counter to track queue size in O(1) 
time?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:
##########
@@ -288,8 +294,13 @@ public void deleteTopicListWatcher(Long watcherId) {
      */
     public void sendTopicListUpdate(long watcherId, String topicsHash, 
List<String> deletedTopics,
                                     List<String> newTopics) {
-        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash);
+        connection.getCommandSender().sendWatchTopicListUpdate(watcherId, 
newTopics, deletedTopics, topicsHash,
+                t -> {
+                    // TODO add retry with backoff
+                    log.warn(
+                            "[{}] Cannot acquire direct memory tokens for 
sending topic list update. State will be "
+                                    + "inconsistent on the client. {}",
+                            connection.toString(), t.getMessage());
+                });

Review Comment:
   Extracting the error handling into a reusable method.
   ```java
   private void handleSendFailure(String operation, Throwable t, long 
watcherId) {
       // TODO: add retry with backoff
       log.warn("[{}] Cannot acquire direct memory tokens for sending topic 
list update. State will be "
                                       + "inconsistent on the client. {}",
                               connection.toString(), t.getMessage());
   }
   
   ...
   // Line 206
   connection.getCommandSender().sendWatchTopicListSuccess(requestId, 
watcherId, hash, topicList,
           t -> handleSendFailure("topic list success", t, watcherId));
   ...
   // Line 297
   connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, 
deletedTopics, topicsHash,
           t -> handleSendFailure("topic list update", t, watcherId));
   ...
   ```



##########
pip/pip-442.md:
##########
@@ -132,24 +134,48 @@ public interface AsyncSemaphore {
      * Returned future completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireTimeoutException`



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.topiclistlimit;
+
+import io.opentelemetry.api.metrics.DoubleGauge;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongGauge;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableDoubleGauge;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Summary;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.semaphore.AsyncDualMemoryLimiterImpl;
+import org.apache.pulsar.common.semaphore.AsyncSemaphore;
+
+/**
+ * Topic list memory limiter that exposes both Prometheus metrics and 
OpenTelemetry metrics.
+ */
+@Slf4j
+public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {
+    private final CollectorRegistry collectorRegistry;
+    private final Gauge heapMemoryUsedBytes;
+    private final Gauge heapMemoryLimitBytes;
+    private final Gauge directMemoryUsedBytes;
+    private final Gauge directMemoryLimitBytes;
+    private final Gauge heapQueueSize;
+    private final Gauge heapQueueMaxSize;
+    private final Gauge directQueueSize;
+    private final Gauge directQueueMaxSize;
+    private final Summary heapWaitTimeMs;
+    private final Summary directWaitTimeMs;
+    private final Counter heapTimeoutTotal;
+    private final Counter directTimeoutTotal;
+    private final ObservableDoubleGauge otelHeapMemoryUsedGauge;
+    private final DoubleGauge otelHeapMemoryLimitGauge;
+    private final ObservableDoubleGauge otelDirectMemoryUsedGauge;
+    private final DoubleGauge otelDirectMemoryLimitGauge;
+    private final ObservableLongUpDownCounter otelHeapQueueSize;
+    private final ObservableLongUpDownCounter otelDirectQueueSize;
+    private final DoubleHistogram otelHeapWaitTime;
+    private final DoubleHistogram otelDirectWaitTime;
+    private final LongCounter otelHeapTimeoutTotal;
+    private final LongCounter otelDirectTimeoutTotal;
+
+    public TopicListMemoryLimiter(CollectorRegistry collectorRegistry, String 
prometheusPrefix,
+                                  Meter openTelemetryMeter,
+                                  long maxHeapMemory, int maxHeapQueueSize,
+                                  long heapTimeoutMillis, long 
maxDirectMemory, int maxDirectQueueSize,
+                                  long directTimeoutMillis) {
+        super(maxHeapMemory, maxHeapQueueSize, heapTimeoutMillis, 
maxDirectMemory, maxDirectQueueSize,
+                directTimeoutMillis);
+        this.collectorRegistry = collectorRegistry;
+
+        AsyncSemaphore heapMemoryLimiter = getLimiter(LimitType.HEAP_MEMORY);
+        AsyncSemaphore directMemoryLimiter = 
getLimiter(LimitType.DIRECT_MEMORY);
+
+        this.heapMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_used_bytes",
+                        "Current heap memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelHeapMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.used")
+                .setUnit("By")
+                .setDescription("Current heap memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(heapMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.heapMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_memory_limit_bytes",
+                        "Configured heap memory limit")
+                .create());
+        this.heapMemoryLimitBytes.set(maxHeapMemory);
+        this.otelHeapMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.heap.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured heap memory limit")
+                .build();
+        this.otelHeapMemoryLimitGauge.set(maxHeapMemory);
+
+        this.directMemoryUsedBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_used_bytes",
+                        "Current direct memory used by topic listings")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getAcquiredPermits();
+                    }
+                }));
+        this.otelDirectMemoryUsedGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.used")
+                .setUnit("By")
+                .setDescription("Current direct memory used by topic listings")
+                .buildWithCallback(observableDoubleMeasurement -> {
+                    
observableDoubleMeasurement.record(directMemoryLimiter.getAcquiredPermits());
+                });
+
+        this.directMemoryLimitBytes = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_memory_limit_bytes",
+                        "Configured direct memory limit")
+                .create());
+        this.directMemoryLimitBytes.set(maxDirectMemory);
+        this.otelDirectMemoryLimitGauge = 
openTelemetryMeter.gaugeBuilder("topic.list.direct.memory.limit")
+                .setUnit("By")
+                .setDescription("Configured direct memory limit")
+                .build();
+        this.otelDirectMemoryLimitGauge.set(maxDirectMemory);
+
+        this.heapQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_size",
+                        "Current heap memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return heapMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelHeapQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.heap.queue.size")
+                .setDescription("Current heap memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(heapMemoryLimiter.getQueueSize());
+                });
+
+        this.heapQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_heap_queue_max_size",
+                        "Maximum heap memory limiter queue size")
+                .create());
+        this.heapQueueMaxSize.set(maxHeapQueueSize);
+        LongGauge otelHeapQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.heap.queue.max.size")
+                .setDescription("Maximum heap memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelHeapQueueMaxSize.set(maxHeapQueueSize);
+
+        this.directQueueSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_size",
+                        "Current direct memory limiter queue size")
+                .create()
+                .setChild(new Gauge.Child() {
+                    @Override
+                    public double get() {
+                        return directMemoryLimiter.getQueueSize();
+                    }
+                }));
+        this.otelDirectQueueSize = openTelemetryMeter
+                .upDownCounterBuilder("topic.list.direct.queue.size")
+                .setDescription("Current direct memory limiter queue size")
+                .setUnit("1")
+                .buildWithCallback(observableLongMeasurement -> {
+                    
observableLongMeasurement.record(directMemoryLimiter.getQueueSize());
+                });
+
+        this.directQueueMaxSize = register(Gauge.build(prometheusPrefix + 
"topic_list_direct_queue_max_size",
+                        "Maximum direct memory limiter queue size")
+                .create());
+        this.directQueueMaxSize.set(maxDirectQueueSize);
+        LongGauge otelDirectQueueMaxSize = openTelemetryMeter
+                .gaugeBuilder("topic.list.direct.queue.max.size")
+                .setDescription("Maximum direct memory limiter queue size")
+                .setUnit("1")
+                .ofLongs()
+                .build();
+        otelDirectQueueMaxSize.set(maxDirectQueueSize);

Review Comment:
   The `otelDirectQueueMaxSize` gauge is created as a local variable and cannot 
be closed in the `close()` method, potentially causing a resource leak.



##########
pip/pip-442.md:
##########
@@ -132,24 +134,48 @@ public interface AsyncSemaphore {
      * Returned future completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
      * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     *
+     * @param permits     number of permits to acquire
+     * @param isCancelled supplier that returns true if acquisition should be 
cancelled
      * @return CompletableFuture that completes with permit when available
      */
-    CompletableFuture<AsyncSemaphorePermitResult> acquire(long permits);
+    CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled);
 
     /**
      * Acquire or release permits for previously acquired permits by updating 
the permits.
      * Returns a future that completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireTimeoutException`



##########
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphore.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.semaphore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+/**
+ * An abstraction for a generic asynchronous semaphore.
+ */
+public interface AsyncSemaphore {
+    /**
+     * Acquire permits from the semaphore.
+     * Returned future completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireQueueFullException`



##########
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphore.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.semaphore;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
+
+/**
+ * An abstraction for a generic asynchronous semaphore.
+ */
+public interface AsyncSemaphore {
+    /**
+     * Acquire permits from the semaphore.
+     * Returned future completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     *
+     * @param permits     number of permits to acquire
+     * @param isCancelled supplier that returns true if acquisition should be 
cancelled
+     * @return CompletableFuture that completes with permit when available
+     */
+    CompletableFuture<AsyncSemaphorePermit> acquire(long permits, 
BooleanSupplier isCancelled);
+
+    /**
+     * Acquire or release permits for previously acquired permits by updating 
the permits.
+     * Returns a future that completes when permits are available.
+     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireTimeoutException`



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java:
##########
@@ -357,6 +357,59 @@ private void performGetTopicsOfNamespace(long 
clientRequestId,
         });
     }
 
+    private void internalPerformGetTopicsOfNamespace(long clientRequestId, 
String namespaceName, ClientCnx clientCnx,
+                                                     ByteBuf command, long 
requestId) {
+        BooleanSupplier isPermitRequestCancelled = () -> 
!proxyConnection.ctx().channel().isActive();
+        
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
+                AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                    return clientCnx.newGetTopicsOfNamespace(command, 
requestId).whenComplete((r, t) -> {
+                        if (t != null) {
+                            log.warn("[{}] Failed to get TopicsOfNamespace {}: 
{}", clientAddress, namespaceName,
+                                    t.getMessage());
+                            writeAndFlush(Commands.newError(clientRequestId, 
getServerError(t), t.getMessage()));
+                        } else {
+                            long actualSize =
+                                    
r.getNonPartitionedOrPartitionTopics().stream().mapToInt(String::length).sum();
+                            
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, permits -> {
+                                return 
handleWritingGetTopicsResponse(clientRequestId, r, isPermitRequestCancelled);
+                            }, t2 -> {
+                                log.warn("[{}] Failed to acquire actual heap 
memory permits for "
+                                        + "GetTopicsOfNamespace: {}", 
clientAddress, t2.getMessage());
+                                
writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests,
+                                        "Failed due to heap memory limit 
exceeded"));
+
+                                return CompletableFuture.completedFuture(null);
+                            });
+                        }
+                    }).thenApply(__ -> null);
+                }, t -> {
+                    log.warn("[{}] Failed to acquire initial heap memory 
permits for GetTopicsOfNamespace: {}",
+                            clientAddress, t.getMessage());
+                    writeAndFlush(Commands.newError(clientRequestId, 
ServerError.TooManyRequests,
+                            "Failed due to heap memory limit exceeded"));
+
+                    return CompletableFuture.completedFuture(null);
+                }).exceptionally(ex -> {
+            writeAndFlush(Commands.newError(clientRequestId, 
getServerError(ex), ex.getMessage()));
+            return null;
+        });
+    }
+
+    private CompletableFuture<Void> handleWritingGetTopicsResponse(long 
clientRequestId, GetTopicsResult r,
+                                                                   
BooleanSupplier isCancelled) {
+        BaseCommand responseCommand = 
Commands.newGetTopicsOfNamespaceResponseCommand(
+                r.getNonPartitionedOrPartitionTopics(), r.getTopicsHash(), 
r.isFiltered(),
+                r.isChanged(), clientRequestId);
+        return 
acquireDirectMemoryPermitsAndWriteAndFlush(proxyConnection.ctx(), 
maxTopicListInFlightLimiter,
+                isCancelled, responseCommand, t -> {
+                    log.warn("[{}] Failed to acquire actual direct memory 
permits for GetTopicsOfNamespace: {}",
+                            clientAddress, t.getMessage());
+                    writeAndFlush(Commands.newError(clientRequestId, 
ServerError.TooManyRequests,
+                            "Failed due to heap memory limit exceeded"));

Review Comment:
   "Failed due to direct memory limit exceeded"));



##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java:
##########
@@ -357,6 +357,59 @@ private void performGetTopicsOfNamespace(long 
clientRequestId,
         });
     }
 
+    private void internalPerformGetTopicsOfNamespace(long clientRequestId, 
String namespaceName, ClientCnx clientCnx,
+                                                     ByteBuf command, long 
requestId) {
+        BooleanSupplier isPermitRequestCancelled = () -> 
!proxyConnection.ctx().channel().isActive();
+        
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
+                AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                    return clientCnx.newGetTopicsOfNamespace(command, 
requestId).whenComplete((r, t) -> {
+                        if (t != null) {
+                            log.warn("[{}] Failed to get TopicsOfNamespace {}: 
{}", clientAddress, namespaceName,
+                                    t.getMessage());
+                            writeAndFlush(Commands.newError(clientRequestId, 
getServerError(t), t.getMessage()));
+                        } else {
+                            long actualSize =
+                                    
r.getNonPartitionedOrPartitionTopics().stream().mapToInt(String::length).sum();

Review Comment:
   Java strings are UTF-16, so 2 bytes per character?



##########
pip/pip-442.md:
##########
@@ -132,24 +134,48 @@ public interface AsyncSemaphore {
      * Returned future completes when permits are available.
      * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
      * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full

Review Comment:
   typo: `AsyncSemaphore.PermitAcquireQueueFullException`



##########
pip/pip-442.md:
##########
@@ -163,227 +189,409 @@ public interface AsyncDualMemoryLimiter {
         HEAP_MEMORY,    // For heap memory allocation
         DIRECT_MEMORY   // For direct memory allocation
     }
-    
+
     /**
      * Acquire permits for the specified memory size.
      * Returned future completes when memory permits are available.
-     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
-     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * It will complete exceptionally with 
AsyncSemaphore.PermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException 
when queue full
+     *
+     * @param memorySize  the size of memory to acquire permits for
+     * @param limitType   the type of memory limit (HEAP_MEMORY or 
DIRECT_MEMORY)
+     * @param isCancelled supplier that returns true if acquisition should be 
cancelled
      * @return CompletableFuture that completes with permit when available
      */
-    CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, 
LimitType limitType);
+    CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, 
LimitType limitType,
+                                                            BooleanSupplier 
isCancelled);
 
     /**
      * Acquire or release permits for previously acquired permits by updating 
the requested memory size.
      * Returns a future that completes when permits are available.
-     * It will complete exceptionally with 
AsyncSemaphorePermitAcquireTimeoutException on timeout
-     * and exceptionally with AsyncSemaphorePermitAcquireQueueFullException 
when queue full
+     * It will complete exceptionally with 
AsyncSemaphore.PermitAcquireTimeoutException on timeout
+     * and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException 
when queue full
+     * The provided permit is released when the permits are successfully 
acquired and the returned updated
+     * permit replaces the old instance.
+     *
+     * @param permit        the previously acquired permit to update
+     * @param newMemorySize the new memory size to update to
+     * @param isCancelled   supplier that returns true if update should be 
cancelled
      * @return CompletableFuture that completes with permit when available
      */
-    CompletableFuture<AsyncDualMemoryLimiterPermit> 
update(AsyncDualMemoryLimiterPermit permit, long newMemorySize);
-    
+    CompletableFuture<AsyncDualMemoryLimiterPermit> 
update(AsyncDualMemoryLimiterPermit permit, long newMemorySize,
+                                                           BooleanSupplier 
isCancelled);
+
     /**
      * Release previously acquired permit.
      * Must be called to prevent memory permit leaks.
+     *
+     * @param permit the permit to release
      */
     void release(AsyncDualMemoryLimiterPermit permit);
+    /**
+     * Execute the specified function with acquired permits and release the 
permits after the returned future completes.
+     * @param memorySize memory size to acquire permits for
+     * @param limitType memory limit type to acquire permits for
+     * @param function function to execute with acquired permits
+     * @return result of the function
+     * @param <T> type of the CompletableFuture returned by the function
+     */
+    default <T> CompletableFuture<T> withAcquiredPermits(long memorySize, 
LimitType limitType,
+                                                         BooleanSupplier 
isCancelled,
+                                                         
Function<AsyncDualMemoryLimiterPermit,
+                                                                 
CompletableFuture<T>> function,
+                                                         Function<Throwable, 
CompletableFuture<T>>
+                                                                 
permitAcquireErrorHandler) {
+        return 
AsyncDualMemoryLimiterUtil.withPermitsFuture(acquire(memorySize, limitType, 
isCancelled), function,
+                permitAcquireErrorHandler, this::release);
+    }
+
+    /**
+     * Executed the specified function with updated permits and release the 
permits after the returned future completes.
+     * @param initialPermit initial permit to update
+     * @param newMemorySize new memory size to update to
+     * @param function function to execute with updated permits
+     * @return result of the function
+     * @param <T> type of the CompletableFuture returned by the function
+     */
+    default <T> CompletableFuture<T> 
withUpdatedPermits(AsyncDualMemoryLimiterPermit initialPermit, long 
newMemorySize,
+                                                        BooleanSupplier 
isCancelled,
+                                                        
Function<AsyncDualMemoryLimiterPermit,
+                                                                
CompletableFuture<T>> function,
+                                                        Function<Throwable, 
CompletableFuture<T>>
+                                                                
permitAcquireErrorHandler) {
+        return 
AsyncDualMemoryLimiterUtil.withPermitsFuture(update(initialPermit, 
newMemorySize, isCancelled), function,
+                permitAcquireErrorHandler, this::release);
+    }
+}
+```
+
+#### AsyncDualMemoryLimiterUtil Helper
+
+A utility class provides helper methods for common patterns:
+
+```java
+public class AsyncDualMemoryLimiterUtil {
+    /**
+     * Execute a function with acquired permits and ensure permits are 
released after completion.
+     * This method handles the lifecycle of permits - acquisition, usage, and 
release, including error cases.
+     *
+     * @param permitsFuture             Future that will complete with the 
required permits
+     * @param function                  Function to execute once permits are 
acquired that returns a CompletableFuture
+     * @param permitAcquireErrorHandler Handler for permit acquisition errors 
that returns a CompletableFuture
+     * @param releaser                  Consumer that handles releasing the 
permits
+     * @param <T>                       The type of result returned by the 
function
+     * @return CompletableFuture that completes with the result of the 
function execution
+     */
+    public static <T> CompletableFuture<T> withPermitsFuture(
+            CompletableFuture<AsyncDualMemoryLimiterPermit>
+                    permitsFuture,
+            Function<AsyncDualMemoryLimiterPermit,
+                    CompletableFuture<T>> function,
+            Function<Throwable, CompletableFuture<T>>
+                    permitAcquireErrorHandler,
+            Consumer<AsyncDualMemoryLimiterPermit> releaser) {
+        // implementation omitted from PIP document
+    }
+
+    /**
+     * Acquires permits and writes the command as a response to the channel.
+     * Releases the permits after the response has been written to the socket 
or if the write fails.
+     *
+     * @param ctx the channel handler context used for writing the response
+     * @param dualMemoryLimiter the memory limiter used to acquire and release 
memory permits
+     * @param isCancelled supplier that indicates if the permit acquisition 
should be cancelled
+     * @param command the base command to serialize and write to the channel
+     * @param permitAcquireErrorHandler handler for errors that occur during 
permit acquisition
+     * @return a future that completes when the command has been written to 
the channel's outbound buffer
+     */
+    public static CompletableFuture<Void> 
acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx,
+                                                                               
      AsyncDualMemoryLimiter
+                                                                               
              dualMemoryLimiter,
+                                                                               
      BooleanSupplier isCancelled,
+                                                                               
      BaseCommand command,
+                                                                               
      Consumer<Throwable>
+                                                                               
              permitAcquireErrorHandler
+    ) {
+        // implementation omitted from PIP document
+    }
 }
 ```
 
 #### Integration Points
 
-**1. Heap Memory Limiting (Post-Retrieval)**
+**1. Heap Memory Limiting (Post-Retrieval) - Broker**
 
-In `ServerCnx.handleGetTopicsOfNamespace`:
+In `ServerCnx.handleGetTopicsOfNamespace`, the implementation uses the helper 
methods:
 
 ```java
-// Acquire a fixed amount of permits initially since it's not known how much 
memory will be used
-// This will ensure that the operation continues only after it has the initial 
permits
-// It would be possible to use statistics for initial estimate, but this is 
simpler and sufficient
-maxTopicListInFlightLimiter.acquire(INITIAL_SIZE, 
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY)
-    .thenCompose(initialPermit -> {
-        
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
 mode)
-            .thenCompose(topics -> {
-                // Estimate memory after retrieval and update the permits to 
reflect the actual size
-                long estimatedSize = 
topics.stream().mapToInt(String::length).sum();
-                return maxTopicListInFlightLimiter
-                    .update(initialPermit, estimatedSize)
-                    .thenApply(permit -> Pair.of(topics, permit));
-            })
-            .thenAccept(topicsAndPermit -> {
-                try {
-                    // Process and send response
-                    ...
-                } finally {
-                    
maxTopicListInFlightLimiter.release(topicsAndPermit.getRight());
-                }
+    private void internalHandleGetTopicsOfNamespace(String namespace, 
NamespaceName namespaceName, long requestId,
+                                                    
CommandGetTopicsOfNamespace.Mode mode,
+                                                    Optional<String> 
topicsPattern, Optional<String> topicsHash,
+                                                    Semaphore lookupSemaphore) 
{
+    BooleanSupplier isPermitRequestCancelled = () -> 
!ctx().channel().isActive();
+    
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
+            AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, 
isPermitRequestCancelled, initialPermits -> {
+                return 
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName,
 mode)
+                        .thenAccept(topics -> {
+                            long actualSize = 
topics.stream().mapToInt(String::length).sum();
+                            
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
+                                    isPermitRequestCancelled, permits -> {
+                                        boolean filterTopics = false;
+                                        // filter system topic
+                                        List<String> filteredTopics = topics;
+
+                                        if 
(enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
+                                            if (topicsPattern.get().length() 
<= maxSubscriptionPatternLength) {
+                                                filterTopics = true;
+                                                filteredTopics = 
TopicList.filterTopics(filteredTopics, topicsPattern.get(),
+                                                        
topicsPatternImplementation);
+                                            } else {
+                                                log.info("[{}] Subscription 
pattern provided [{}] was longer than "
+                                                                + "maximum 
{}.", remoteAddress, topicsPattern.get(),
+                                                        
maxSubscriptionPatternLength);
+                                            }
+                                        }
+                                        String hash = 
TopicList.calculateHash(filteredTopics);
+                                        boolean hashUnchanged = 
topicsHash.isPresent() && topicsHash.get().equals(hash);
+                                        if (hashUnchanged) {
+                                            filteredTopics = 
Collections.emptyList();
+                                        }
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] Received 
CommandGetTopicsOfNamespace for namespace "
+                                                            + "[//{}] by {}, 
size:{}", remoteAddress, namespace,
+                                                    requestId,
+                                                    topics.size());
+                                        }
+                                        
commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, 
filterTopics,
+                                                !hashUnchanged, requestId, ex 
-> {
+                                                    log.warn("[{}] Failed to 
acquire direct memory permits for "
+                                                            + 
"GetTopicsOfNamespace: {}", remoteAddress, ex.getMessage());
+                                                    
commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+                                                            "Cannot acquire 
permits for direct memory");
+                                                });
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }, t -> {
+                                        log.warn("[{}] Failed to acquire heap 
memory permits for "
+                                                + "GetTopicsOfNamespace: {}", 
remoteAddress, t.getMessage());
+                                        
writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests,
+                                                "Failed due to heap memory 
limit exceeded"));
+                                        return 
CompletableFuture.completedFuture(null);
+                                    });
+                        }).whenComplete((__, ___) -> {
+                            lookupSemaphore.release();
+                        }).exceptionally(ex -> {
+                            log.warn("[{}] Error GetTopicsOfNamespace for 
namespace [//{}] by {}", remoteAddress,
+                                    namespace, requestId);
+                            commandSender.sendErrorResponse(requestId,
+                                    
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+                                    ex.getMessage());
+                            return null;
+                        });
+            }, t -> {
+                log.warn("[{}] Failed to acquire initial heap memory permits 
for GetTopicsOfNamespace: {}",
+                        remoteAddress, t.getMessage());
+                writeAndFlush(Commands.newError(requestId, 
ServerError.TooManyRequests,
+                        "Failed due to heap memory limit exceeded"));
+                lookupSemaphore.release();
+                return CompletableFuture.completedFuture(null);
             });
-        ...
-    // For exceptional paths, initialPermit would need to be released
+}
 ```
 
-**2. Direct Memory Limiting (Pre-Serialization)**
+**2. Direct Memory Limiting (Pre-Serialization) - Broker**
 
-Modified `CommandSender` implementation:
+Modified `PulsarCommandSenderImpl`:
 
 ```java
 @Override
 public void sendGetTopicsOfNamespaceResponse(List<String> topics, String 
topicsHash,
-                                             boolean filtered, boolean 
changed, long requestId) {
+                                             boolean filtered, boolean 
changed, long requestId,
+                                             Consumer<Throwable> 
permitAcquireErrorHandler) {
     BaseCommand command = 
Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
             filtered, changed, requestId);
     safeIntercept(command, cnx);
-    acquireMaxTopicListInFlightPermitsAndWriteAndFlush(command);
+    acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), 
maxTopicListInFlightLimiter, () -> !cnx.isActive(),
+            command, permitAcquireErrorHandler);
 }
+```
 
-private void acquireMaxTopicListInFlightPermitsAndWriteAndFlush(BaseCommand 
command) {
+The utility method implementation:
+
+```java
+public static CompletableFuture<Void> 
acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx,
+                                                                               
      AsyncDualMemoryLimiter
+                                                                               
              dualMemoryLimiter,
+                                                                               
      BooleanSupplier isCancelled,
+                                                                               
      BaseCommand command,
+                                                                               
      Consumer<Throwable>
+                                                                               
              permitAcquireErrorHandler
+) {
     // Calculate serialized size before acquiring permits
     int serializedSize = command.getSerializedSize();
     // Acquire permits
-    maxTopicListInFlightLimiter.acquire(serializedSize, 
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY)
-            .thenAcceptAsync(permits -> {
+    return dualMemoryLimiter.acquire(serializedSize, 
AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY, isCancelled)
+            .whenComplete((permits, t) -> {
+                if (t != null) {
+                    permitAcquireErrorHandler.accept(t);
+                    return;
+                }
                 try {
                     // Serialize the response
                     ByteBuf outBuf = 
Commands.serializeWithPrecalculatedSerializedSize(command, serializedSize);
                     // Write the response
-                    cnx.ctx().writeAndFlush(outBuf).addListener(future -> {
+                    ctx.writeAndFlush(outBuf).addListener(future -> {
                         // Release permits after the response has been written 
to the socket
-                        maxTopicListInFlightLimiter.release(permits);
+                        dualMemoryLimiter.release(permits);
                     });
                 } catch (Exception e) {

Review Comment:
   The `withPermitsFuture` method catches `Throwable (line 71)` to ensure 
permits are released even for Error subclasses.
   
   ```java
   } catch (Throwable e) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to