This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c9c5bb49b05 [feat][misc] PIP-264: Add OpenTelemetry messaging rate 
limit metrics (#23035)
c9c5bb49b05 is described below

commit c9c5bb49b05118429426a4589cd8f57c47980318
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jul 23 12:05:25 2024 -0700

    [feat][misc] PIP-264: Add OpenTelemetry messaging rate limit metrics 
(#23035)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   2 +-
 .../mledger/impl/cache/InflightReadsLimiter.java   |  49 +++++++++-
 .../impl/cache/RangeEntryCacheManagerImpl.java     |   5 +-
 .../impl/cache/InflightReadsLimiterTest.java       |  85 ++++++++++++++---
 .../impl/cache/PendingReadsManagerTest.java        |  36 ++++----
 .../pulsar/broker/service/BrokerService.java       |  37 ++++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 .../broker/service/ServerCnxThrottleTracker.java   |  14 +--
 .../service/MessagePublishBufferThrottleTest.java  | 102 +++++++++++++++++----
 .../opentelemetry/OpenTelemetryAttributes.java     |  19 ++++
 10 files changed, 278 insertions(+), 75 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 00afb85a9d4..398575461d5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -213,7 +213,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                 compressionConfigForManagedCursorInfo);
         this.config = config;
         this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
-        this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
+        this.entryCacheManager = new RangeEntryCacheManagerImpl(this, 
openTelemetry);
         this.statsTask = 
scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
                 0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
         this.flushCursorsTask = 
scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index b946dc09a0c..c87807b8663 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -19,20 +19,37 @@
 package org.apache.bookkeeper.mledger.impl.cache;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
 import io.prometheus.client.Gauge;
 import lombok.AllArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.opentelemetry.Constants;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 @Slf4j
-public class InflightReadsLimiter {
+public class InflightReadsLimiter implements AutoCloseable {
 
+    public static final String INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME =
+            "pulsar.broker.managed_ledger.inflight.read.limit";
+    private final ObservableLongCounter inflightReadsLimitCounter;
+
+    @PulsarDeprecatedMetric(newMetricName = 
INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+    @Deprecated
     private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
             .build()
             .name("pulsar_ml_reads_inflight_bytes")
             .help("Estimated number of bytes retained by data read from 
storage or cache")
             .register();
 
+    public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME =
+            "pulsar.broker.managed_ledger.inflight.read.usage";
+    private final ObservableLongCounter inflightReadsUsageCounter;
+
+    @PulsarDeprecatedMetric(newMetricName = 
INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+    @Deprecated
     private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
             .build()
             .name("pulsar_ml_reads_available_inflight_bytes")
@@ -42,7 +59,7 @@ public class InflightReadsLimiter {
     private final long maxReadsInFlightSize;
     private long remainingBytes;
 
-    public InflightReadsLimiter(long maxReadsInFlightSize) {
+    public InflightReadsLimiter(long maxReadsInFlightSize, OpenTelemetry 
openTelemetry) {
         if (maxReadsInFlightSize <= 0) {
             // set it to -1 in order to show in the metrics that the metric is 
not available
             PULSAR_ML_READS_BUFFER_SIZE.set(-1);
@@ -50,6 +67,28 @@ public class InflightReadsLimiter {
         }
         this.maxReadsInFlightSize = maxReadsInFlightSize;
         this.remainingBytes = maxReadsInFlightSize;
+
+        var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+        inflightReadsLimitCounter = 
meter.counterBuilder(INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+                .setDescription("Maximum number of bytes that can be retained 
by managed ledger data read from storage "
+                        + "or cache.")
+                .setUnit("By")
+                .buildWithCallback(measurement -> {
+                    if (!isDisabled()) {
+                        measurement.record(maxReadsInFlightSize);
+                    }
+                });
+        inflightReadsUsageCounter = 
meter.counterBuilder(INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+                .setDescription("Estimated number of bytes retained by managed 
ledger data read from storage or cache.")
+                .setUnit("By")
+                .buildWithCallback(measurement -> {
+                    if (!isDisabled()) {
+                        var freeBytes = getRemainingBytes();
+                        var usedBytes = maxReadsInFlightSize - freeBytes;
+                        measurement.record(freeBytes, 
InflightReadLimiterUtilization.FREE.attributes);
+                        measurement.record(usedBytes, 
InflightReadLimiterUtilization.USED.attributes);
+                    }
+                });
     }
 
     @VisibleForTesting
@@ -57,6 +96,12 @@ public class InflightReadsLimiter {
         return remainingBytes;
     }
 
+    @Override
+    public void close() {
+        inflightReadsLimitCounter.close();
+        inflightReadsUsageCounter.close();
+    }
+
     @AllArgsConstructor
     @ToString
     static class Handle {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index d5a3019855c..34be25df1f4 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl.cache;
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+import io.opentelemetry.api.OpenTelemetry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -56,10 +57,10 @@ public class RangeEntryCacheManagerImpl implements 
EntryCacheManager {
     private static final double evictionTriggerThresholdPercent = 0.98;
 
 
-    public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+    public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory, 
OpenTelemetry openTelemetry) {
         this.maxSize = factory.getConfig().getMaxCacheSize();
         this.inflightReadsLimiter = new InflightReadsLimiter(
-                factory.getConfig().getManagedLedgerMaxReadsInFlightSize());
+                factory.getConfig().getManagedLedgerMaxReadsInFlightSize(), 
openTelemetry);
         this.evictionTriggerThreshold = (long) (maxSize * 
evictionTriggerThresholdPercent);
         this.cacheEvictionWatermark = 
factory.getConfig().getCacheEvictionWatermark();
         this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 2b69581ca2c..89bdda15afb 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -18,45 +18,79 @@
  */
 package org.apache.bookkeeper.mledger.impl.cache;
 
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.FREE;
+import static 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.USED;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 public class InflightReadsLimiterTest {
 
-    @Test
-    public void testDisabled() throws Exception {
-
-        InflightReadsLimiter limiter = new InflightReadsLimiter(0);
-        assertTrue(limiter.isDisabled());
-
-        limiter = new InflightReadsLimiter(-1);
-        assertTrue(limiter.isDisabled());
+    @DataProvider
+    private static Object[][] isDisabled() {
+        return new Object[][] {
+            {0, true},
+            {-1, true},
+            {1, false},
+        };
+    }
 
-        limiter = new InflightReadsLimiter(1);
-        assertFalse(limiter.isDisabled());
+    @Test(dataProvider = "isDisabled")
+    public void testDisabled(long maxReadsInFlightSize, boolean 
shouldBeDisabled) throws Exception {
+        var otel = buildOpenTelemetryAndReader();
+        @Cleanup var openTelemetry = otel.getLeft();
+        @Cleanup var metricReader = otel.getRight();
+
+        var limiter = new InflightReadsLimiter(maxReadsInFlightSize, 
openTelemetry);
+        assertEquals(limiter.isDisabled(), shouldBeDisabled);
+
+        if (shouldBeDisabled) {
+            // Verify metrics are not present
+            var metrics = metricReader.collectAllMetrics();
+            assertThat(metrics).noneSatisfy(metricData -> 
assertThat(metricData)
+                    
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME));
+            assertThat(metrics).noneSatisfy(metricData -> 
assertThat(metricData)
+                    
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME));
+        }
     }
 
     @Test
     public void testBasicAcquireRelease() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+        var otel = buildOpenTelemetryAndReader();
+        @Cleanup var openTelemetry = otel.getLeft();
+        @Cleanup var metricReader = otel.getRight();
+
+        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
openTelemetry);
         assertEquals(100, limiter.getRemainingBytes());
+        assertLimiterMetrics(metricReader, 100, 0, 100);
+
         InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
         assertEquals(0, limiter.getRemainingBytes());
         assertTrue(handle.success);
         assertEquals(handle.acquiredPermits, 100);
         assertEquals(1, handle.trials);
+        assertLimiterMetrics(metricReader, 100, 100, 0);
+
         limiter.release(handle);
         assertEquals(100, limiter.getRemainingBytes());
+        assertLimiterMetrics(metricReader, 100, 0, 100);
     }
 
+
     @Test
     public void testNotEnoughPermits() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
         assertEquals(100, limiter.getRemainingBytes());
         InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
         assertEquals(0, limiter.getRemainingBytes());
@@ -86,7 +120,7 @@ public class InflightReadsLimiterTest {
 
     @Test
     public void testPartialAcquire() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
         assertEquals(100, limiter.getRemainingBytes());
 
         InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
@@ -116,7 +150,7 @@ public class InflightReadsLimiterTest {
 
     @Test
     public void testTooManyTrials() throws Exception {
-        InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+        InflightReadsLimiter limiter = new InflightReadsLimiter(100, 
OpenTelemetry.noop());
         assertEquals(100, limiter.getRemainingBytes());
 
         InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
@@ -169,4 +203,25 @@ public class InflightReadsLimiterTest {
 
     }
 
+    private Pair<OpenTelemetrySdk, InMemoryMetricReader> 
buildOpenTelemetryAndReader() {
+        var metricReader = InMemoryMetricReader.create();
+        var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+                .addMeterProviderCustomizer((builder, __) -> 
builder.registerMetricReader(metricReader))
+                .build()
+                .getOpenTelemetrySdk();
+        return Pair.of(openTelemetry, metricReader);
+    }
+
+    private void assertLimiterMetrics(InMemoryMetricReader metricReader,
+                                      long expectedLimit, long expectedUsed, 
long expectedFree) {
+        var metrics = metricReader.collectAllMetrics();
+        assertThat(metrics).anySatisfy(metricData -> assertThat(metricData)
+                
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+                .hasLongSumSatisfying(longSum -> 
longSum.hasPointsSatisfying(point -> point.hasValue(expectedLimit))));
+        assertThat(metrics).anySatisfy(metricData -> assertThat(metricData)
+                
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+                .hasLongSumSatisfying(longSum -> longSum.hasPointsSatisfying(
+                        point -> 
point.hasValue(expectedFree).hasAttributes(FREE.attributes),
+                        point -> 
point.hasValue(expectedUsed).hasAttributes(USED.attributes))));
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 6f573ff8d75..01976f648ab 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -18,8 +18,24 @@
  */
 package org.apache.bookkeeper.mledger.impl.cache;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertSame;
+import io.opentelemetry.api.OpenTelemetry;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -30,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterClass;
@@ -38,23 +53,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.assertNotSame;
-import static org.testng.AssertJUnit.assertSame;
-
 @Slf4j
 public class PendingReadsManagerTest  {
 
@@ -93,7 +91,7 @@ public class PendingReadsManagerTest  {
         config.setReadEntryTimeoutSeconds(10000);
         when(rangeEntryCache.getName()).thenReturn("my-topic");
         when(rangeEntryCache.getManagedLedgerConfig()).thenReturn(config);
-        inflighReadsLimiter = new InflightReadsLimiter(0);
+        inflighReadsLimiter = new InflightReadsLimiter(0, 
OpenTelemetry.noop());
         
when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter);
         pendingReadsManager = new PendingReadsManager(rangeEntryCache);
         doAnswer(new Answer() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c0f44838ac6..5ea055287eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -40,7 +40,9 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
 import java.io.Closeable;
 import java.io.IOException;
@@ -182,6 +184,7 @@ import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionRateLimitOperationName;
 import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -258,6 +261,15 @@ public class BrokerService implements Closeable {
     private final ObservableLongUpDownCounter 
pendingTopicLoadOperationsCounter;
     private final ObservableLongUpDownCounter 
pendingTopicLoadOperationsLimitCounter;
 
+    public static final String CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME = 
"pulsar.broker.connection.rate_limit.count";
+    private final LongCounter rateLimitedConnectionsCounter;
+    @PulsarDeprecatedMetric(newMetricName = 
CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+    @Deprecated
+    private static final Gauge throttledConnectionsGauge = Gauge.build()
+            .name("pulsar_broker_throttled_connections")
+            .help("Counter of connections throttled because of per-connection 
limit")
+            .register();
+
     private final ScheduledExecutorService inactivityMonitor;
     private final ScheduledExecutorService messageExpiryMonitor;
     private final ScheduledExecutorService compactionMonitor;
@@ -301,7 +313,6 @@ public class BrokerService implements Closeable {
     private Channel listenChannelTls;
 
     private boolean preciseTopicPublishRateLimitingEnable;
-    private final LongAdder pausedConnections = new LongAdder();
     private BrokerInterceptor interceptor;
     private final EntryFilterProvider entryFilterProvider;
     private TopicFactory topicFactory;
@@ -456,6 +467,12 @@ public class BrokerService implements Closeable {
                 .buildWithCallback(
                         measurement -> 
measurement.record(pulsar.getConfig().getMaxConcurrentTopicLoadRequest()));
 
+        this.rateLimitedConnectionsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                
.counterBuilder(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+                .setDescription("The number of times a connection has been 
rate limited.")
+                .setUnit("{operation}")
+                .build();
+
         this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
                 
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
                         BrokerService.class.getClassLoader());
@@ -3701,16 +3718,22 @@ public class BrokerService implements Closeable {
         return !brokerEntryPayloadProcessors.isEmpty();
     }
 
-    public void pausedConnections(int numberOfConnections) {
-        pausedConnections.add(numberOfConnections);
+    public void recordConnectionPaused() {
+        rateLimitedConnectionsCounter.add(1, 
ConnectionRateLimitOperationName.PAUSED.attributes);
+    }
+
+    public void recordConnectionResumed() {
+        rateLimitedConnectionsCounter.add(1, 
ConnectionRateLimitOperationName.RESUMED.attributes);
     }
 
-    public void resumedConnections(int numberOfConnections) {
-        pausedConnections.add(-numberOfConnections);
+    public void recordConnectionThrottled() {
+        rateLimitedConnectionsCounter.add(1, 
ConnectionRateLimitOperationName.THROTTLED.attributes);
+        throttledConnectionsGauge.inc();
     }
 
-    public long getPausedConnections() {
-        return pausedConnections.longValue();
+    public void recordConnectionUnthrottled() {
+        rateLimitedConnectionsCounter.add(1, 
ConnectionRateLimitOperationName.UNTHROTTLED.attributes);
+        throttledConnectionsGauge.dec();
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 260552c55c0..6690ab4af5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -298,8 +298,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         Start, Connected, Failed, Connecting
     }
 
-    private final ServerCnxThrottleTracker throttleTracker = new 
ServerCnxThrottleTracker(this);
-
+    private final ServerCnxThrottleTracker throttleTracker;
 
     public ServerCnx(PulsarService pulsar) {
         this(pulsar, null);
@@ -348,6 +347,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.topicListService = new TopicListService(pulsar, this,
                 enableSubscriptionPatternEvaluation, 
maxSubscriptionPatternLength);
         this.brokerInterceptor = this.service != null ? 
this.service.getInterceptor() : null;
+        this.throttleTracker = new ServerCnxThrottleTracker(this);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index 7e55397022d..78bac024218 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import io.prometheus.client.Gauge;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import lombok.extern.slf4j.Slf4j;
 
@@ -38,10 +37,6 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Slf4j
 final class ServerCnxThrottleTracker {
-    private static final Gauge throttledConnections = Gauge.build()
-            .name("pulsar_broker_throttled_connections")
-            .help("Counter of connections throttled because of per-connection 
limit")
-            .register();
 
     private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker> 
THROTTLE_COUNT_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(
@@ -59,6 +54,7 @@ final class ServerCnxThrottleTracker {
     private volatile int pendingSendRequestsExceeded;
     private volatile int publishBufferLimiting;
 
+
     public ServerCnxThrottleTracker(ServerCnx serverCnx) {
         this.serverCnx = serverCnx;
 
@@ -94,10 +90,10 @@ final class ServerCnxThrottleTracker {
         }
         // update the metrics that track throttling
         if (autoRead) {
-            serverCnx.getBrokerService().resumedConnections(1);
+            serverCnx.getBrokerService().recordConnectionResumed();
         } else if (isChannelActive()) {
             serverCnx.increasePublishLimitedTimesForTopics();
-            serverCnx.getBrokerService().pausedConnections(1);
+            serverCnx.getBrokerService().recordConnectionPaused();
         }
     }
 
@@ -114,9 +110,9 @@ final class ServerCnxThrottleTracker {
         if (changed) {
             // update the metrics that track throttling due to pending send 
requests
             if (throttlingEnabled) {
-                throttledConnections.inc();
+                serverCnx.getBrokerService().recordConnectionThrottled();
             } else {
-                throttledConnections.dec();
+                serverCnx.getBrokerService().recordConnectionUnthrottled();
             }
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 27f72eac942..0faae14da08 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -18,12 +18,18 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static org.testng.Assert.assertEquals;
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.testng.Assert.fail;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionRateLimitOperationName;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -43,6 +49,12 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @Test
     public void testMessagePublishBufferThrottleDisabled() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(-1);
@@ -52,7 +64,8 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
                 .topic(topic)
                 .producerName("producer-name")
                 .create();
-         assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
 
         pulsarTestContext.getMockBookKeeper().addEntryDelay(1, 
TimeUnit.SECONDS);
 
@@ -63,7 +76,8 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
         }
         producer.flush();
 
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
     }
 
     @Test
@@ -71,14 +85,14 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
         conf.setMaxMessagePublishBufferSizeInMB(1);
         super.baseSetup();
 
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
         final String topic = 
"persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .producerName("producer-name")
                 .create();
 
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
 
         pulsarTestContext.getMockBookKeeper().addEntryDelay(1, 
TimeUnit.SECONDS);
 
@@ -87,23 +101,27 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
             producer.sendAsync(payload);
         }
 
-        Awaitility.await().untilAsserted(
-                () -> 
Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 1L));
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+        Awaitility.await().untilAsserted(() -> {
+            assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+            assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 
0);
+        });
 
         producer.flush();
 
-        Awaitility.await().untilAsserted(
-            () -> 
Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0L));
-
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+        Awaitility.await().untilAsserted(() -> {
+            assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+            assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 
0);
+        });
     }
 
     @Test
     public void testBlockByPublishRateLimiting() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(1);
         super.baseSetup();
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
+
         final String topic = 
"persistent://prop/ns-abc/testBlockByPublishRateLimiting";
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
@@ -111,7 +129,8 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
                 .create();
         Topic topicRef = 
pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
 
         pulsarTestContext.getMockBookKeeper().addEntryDelay(5, 
TimeUnit.SECONDS);
 
@@ -121,13 +140,15 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
             producer.sendAsync(payload);
         }
 
-        Awaitility.await().untilAsserted(() -> 
assertEquals(pulsar.getBrokerService().getPausedConnections(), 1));
+        Awaitility.await().untilAsserted(() -> 
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1));
 
         CompletableFuture<Void> flushFuture = producer.flushAsync();
 
         // Block by publish rate.
         // After 1 second, the message buffer throttling will be lifted, but 
the rate limiting will still be in place.
-        assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
+
         try {
             flushFuture.get(2, TimeUnit.SECONDS);
             fail("Should have timed out");
@@ -137,7 +158,52 @@ public class MessagePublishBufferThrottleTest extends 
BrokerTestBase {
 
         flushFuture.join();
 
-        Awaitility.await().untilAsserted(() ->
-                assertEquals(pulsar.getBrokerService().getPausedConnections(), 
0));
+        Awaitility.await().untilAsserted(() -> {
+            assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 
10);
+            assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 
10);
+        });
+    }
+
+    @Test
+    public void testConnectionThrottled() throws Exception {
+        super.baseSetup();
+
+        var topic = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testSendThrottled");
+
+        assertRateLimitCounter(ConnectionRateLimitOperationName.THROTTLED, 0);
+        assertRateLimitCounter(ConnectionRateLimitOperationName.UNTHROTTLED, 
0);
+
+        @Cleanup
+        var producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false)
+                .topic(topic)
+                .create();
+        final int messages = 2000;
+        for (int i = 0; i < messages; i++) {
+            producer.sendAsync("Message - " + i);
+        }
+        producer.flush();
+
+        // Wait for the connection to be throttled and unthrottled.
+        Awaitility.await().untilAsserted(() -> {
+            var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+            assertMetricLongSumValue(metrics, 
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+                    ConnectionRateLimitOperationName.THROTTLED.attributes, 
value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(metrics, 
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+                    ConnectionRateLimitOperationName.UNTHROTTLED.attributes, 
value -> assertThat(value).isPositive());
+        });
+    }
+
+    private void assertRateLimitCounter(ConnectionRateLimitOperationName 
connectionRateLimitState, int expectedCount) {
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        if (expectedCount == 0) {
+            assertThat(metrics).noneSatisfy(metricData -> 
assertThat(metricData)
+                    
.hasName(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+                    .hasLongSumSatisfying(sum -> sum.hasPointsSatisfying(
+                            points -> 
points.hasAttributes(connectionRateLimitState.attributes))));
+        } else {
+            assertMetricLongSumValue(metrics, 
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+                    connectionRateLimitState.attributes, expectedCount);
+        }
     }
 }
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 41358a72c0d..6eb84e94bc6 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -112,6 +112,17 @@ public interface OpenTelemetryAttributes {
      */
     AttributeKey<String> PULSAR_CLIENT_VERSION = 
AttributeKey.stringKey("pulsar.client.version");
 
+    AttributeKey<String> PULSAR_CONNECTION_RATE_LIMIT_OPERATION_NAME =
+            
AttributeKey.stringKey("pulsar.connection.rate_limit.operation.name");
+    enum ConnectionRateLimitOperationName {
+        PAUSED,
+        RESUMED,
+        THROTTLED,
+        UNTHROTTLED;
+        public final Attributes attributes =
+                Attributes.of(PULSAR_CONNECTION_RATE_LIMIT_OPERATION_NAME, 
name().toLowerCase());
+    }
+
     /**
      * The status of the Pulsar transaction.
      */
@@ -197,6 +208,14 @@ public interface OpenTelemetryAttributes {
         public final Attributes attributes = 
Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase());
     }
 
+    AttributeKey<String> MANAGED_LEDGER_READ_INFLIGHT_USAGE =
+            
AttributeKey.stringKey("pulsar.managed_ledger.inflight.read.usage.state");
+    enum InflightReadLimiterUtilization {
+        USED,
+        FREE;
+        public final Attributes attributes = 
Attributes.of(MANAGED_LEDGER_READ_INFLIGHT_USAGE, name().toLowerCase());
+    }
+
     /**
      * The name of the remote cluster for a Pulsar replicator.
      */


Reply via email to