This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c9c5bb49b05 [feat][misc] PIP-264: Add OpenTelemetry messaging rate
limit metrics (#23035)
c9c5bb49b05 is described below
commit c9c5bb49b05118429426a4589cd8f57c47980318
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jul 23 12:05:25 2024 -0700
[feat][misc] PIP-264: Add OpenTelemetry messaging rate limit metrics
(#23035)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +-
.../mledger/impl/cache/InflightReadsLimiter.java | 49 +++++++++-
.../impl/cache/RangeEntryCacheManagerImpl.java | 5 +-
.../impl/cache/InflightReadsLimiterTest.java | 85 ++++++++++++++---
.../impl/cache/PendingReadsManagerTest.java | 36 ++++----
.../pulsar/broker/service/BrokerService.java | 37 ++++++--
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../broker/service/ServerCnxThrottleTracker.java | 14 +--
.../service/MessagePublishBufferThrottleTest.java | 102 +++++++++++++++++----
.../opentelemetry/OpenTelemetryAttributes.java | 19 ++++
10 files changed, 278 insertions(+), 75 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 00afb85a9d4..398575461d5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -213,7 +213,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
compressionConfigForManagedCursorInfo);
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
- this.entryCacheManager = new RangeEntryCacheManagerImpl(this);
+ this.entryCacheManager = new RangeEntryCacheManagerImpl(this,
openTelemetry);
this.statsTask =
scheduledExecutor.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::refreshStats),
0, config.getStatsPeriodSeconds(), TimeUnit.SECONDS);
this.flushCursorsTask =
scheduledExecutor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::flushCursors),
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
index b946dc09a0c..c87807b8663 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java
@@ -19,20 +19,37 @@
package org.apache.bookkeeper.mledger.impl.cache;
import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Gauge;
import lombok.AllArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.opentelemetry.Constants;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
@Slf4j
-public class InflightReadsLimiter {
+public class InflightReadsLimiter implements AutoCloseable {
+ public static final String INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME =
+ "pulsar.broker.managed_ledger.inflight.read.limit";
+ private final ObservableLongCounter inflightReadsLimitCounter;
+
+ @PulsarDeprecatedMetric(newMetricName =
INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+ @Deprecated
private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_inflight_bytes")
.help("Estimated number of bytes retained by data read from
storage or cache")
.register();
+ public static final String INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME =
+ "pulsar.broker.managed_ledger.inflight.read.usage";
+ private final ObservableLongCounter inflightReadsUsageCounter;
+
+ @PulsarDeprecatedMetric(newMetricName =
INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+ @Deprecated
private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_reads_available_inflight_bytes")
@@ -42,7 +59,7 @@ public class InflightReadsLimiter {
private final long maxReadsInFlightSize;
private long remainingBytes;
- public InflightReadsLimiter(long maxReadsInFlightSize) {
+ public InflightReadsLimiter(long maxReadsInFlightSize, OpenTelemetry
openTelemetry) {
if (maxReadsInFlightSize <= 0) {
// set it to -1 in order to show in the metrics that the metric is
not available
PULSAR_ML_READS_BUFFER_SIZE.set(-1);
@@ -50,6 +67,28 @@ public class InflightReadsLimiter {
}
this.maxReadsInFlightSize = maxReadsInFlightSize;
this.remainingBytes = maxReadsInFlightSize;
+
+ var meter =
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+ inflightReadsLimitCounter =
meter.counterBuilder(INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+ .setDescription("Maximum number of bytes that can be retained
by managed ledger data read from storage "
+ + "or cache.")
+ .setUnit("By")
+ .buildWithCallback(measurement -> {
+ if (!isDisabled()) {
+ measurement.record(maxReadsInFlightSize);
+ }
+ });
+ inflightReadsUsageCounter =
meter.counterBuilder(INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+ .setDescription("Estimated number of bytes retained by managed
ledger data read from storage or cache.")
+ .setUnit("By")
+ .buildWithCallback(measurement -> {
+ if (!isDisabled()) {
+ var freeBytes = getRemainingBytes();
+ var usedBytes = maxReadsInFlightSize - freeBytes;
+ measurement.record(freeBytes,
InflightReadLimiterUtilization.FREE.attributes);
+ measurement.record(usedBytes,
InflightReadLimiterUtilization.USED.attributes);
+ }
+ });
}
@VisibleForTesting
@@ -57,6 +96,12 @@ public class InflightReadsLimiter {
return remainingBytes;
}
+ @Override
+ public void close() {
+ inflightReadsLimitCounter.close();
+ inflightReadsUsageCounter.close();
+ }
+
@AllArgsConstructor
@ToString
static class Handle {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
index d5a3019855c..34be25df1f4 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger.impl.cache;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import io.opentelemetry.api.OpenTelemetry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -56,10 +57,10 @@ public class RangeEntryCacheManagerImpl implements
EntryCacheManager {
private static final double evictionTriggerThresholdPercent = 0.98;
- public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) {
+ public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory,
OpenTelemetry openTelemetry) {
this.maxSize = factory.getConfig().getMaxCacheSize();
this.inflightReadsLimiter = new InflightReadsLimiter(
- factory.getConfig().getManagedLedgerMaxReadsInFlightSize());
+ factory.getConfig().getManagedLedgerMaxReadsInFlightSize(),
openTelemetry);
this.evictionTriggerThreshold = (long) (maxSize *
evictionTriggerThresholdPercent);
this.cacheEvictionWatermark =
factory.getConfig().getCacheEvictionWatermark();
this.evictionPolicy = new EntryCacheDefaultEvictionPolicy();
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
index 2b69581ca2c..89bdda15afb 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java
@@ -18,45 +18,79 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.FREE;
+import static
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.InflightReadLimiterUtilization.USED;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
public class InflightReadsLimiterTest {
- @Test
- public void testDisabled() throws Exception {
-
- InflightReadsLimiter limiter = new InflightReadsLimiter(0);
- assertTrue(limiter.isDisabled());
-
- limiter = new InflightReadsLimiter(-1);
- assertTrue(limiter.isDisabled());
+ @DataProvider
+ private static Object[][] isDisabled() {
+ return new Object[][] {
+ {0, true},
+ {-1, true},
+ {1, false},
+ };
+ }
- limiter = new InflightReadsLimiter(1);
- assertFalse(limiter.isDisabled());
+ @Test(dataProvider = "isDisabled")
+ public void testDisabled(long maxReadsInFlightSize, boolean
shouldBeDisabled) throws Exception {
+ var otel = buildOpenTelemetryAndReader();
+ @Cleanup var openTelemetry = otel.getLeft();
+ @Cleanup var metricReader = otel.getRight();
+
+ var limiter = new InflightReadsLimiter(maxReadsInFlightSize,
openTelemetry);
+ assertEquals(limiter.isDisabled(), shouldBeDisabled);
+
+ if (shouldBeDisabled) {
+ // Verify metrics are not present
+ var metrics = metricReader.collectAllMetrics();
+ assertThat(metrics).noneSatisfy(metricData ->
assertThat(metricData)
+
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME));
+ assertThat(metrics).noneSatisfy(metricData ->
assertThat(metricData)
+
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME));
+ }
}
@Test
public void testBasicAcquireRelease() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+ var otel = buildOpenTelemetryAndReader();
+ @Cleanup var openTelemetry = otel.getLeft();
+ @Cleanup var metricReader = otel.getRight();
+
+ InflightReadsLimiter limiter = new InflightReadsLimiter(100,
openTelemetry);
assertEquals(100, limiter.getRemainingBytes());
+ assertLimiterMetrics(metricReader, 100, 0, 100);
+
InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
assertEquals(0, limiter.getRemainingBytes());
assertTrue(handle.success);
assertEquals(handle.acquiredPermits, 100);
assertEquals(1, handle.trials);
+ assertLimiterMetrics(metricReader, 100, 100, 0);
+
limiter.release(handle);
assertEquals(100, limiter.getRemainingBytes());
+ assertLimiterMetrics(metricReader, 100, 0, 100);
}
+
@Test
public void testNotEnoughPermits() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+ InflightReadsLimiter limiter = new InflightReadsLimiter(100,
OpenTelemetry.noop());
assertEquals(100, limiter.getRemainingBytes());
InflightReadsLimiter.Handle handle = limiter.acquire(100, null);
assertEquals(0, limiter.getRemainingBytes());
@@ -86,7 +120,7 @@ public class InflightReadsLimiterTest {
@Test
public void testPartialAcquire() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+ InflightReadsLimiter limiter = new InflightReadsLimiter(100,
OpenTelemetry.noop());
assertEquals(100, limiter.getRemainingBytes());
InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
@@ -116,7 +150,7 @@ public class InflightReadsLimiterTest {
@Test
public void testTooManyTrials() throws Exception {
- InflightReadsLimiter limiter = new InflightReadsLimiter(100);
+ InflightReadsLimiter limiter = new InflightReadsLimiter(100,
OpenTelemetry.noop());
assertEquals(100, limiter.getRemainingBytes());
InflightReadsLimiter.Handle handle = limiter.acquire(30, null);
@@ -169,4 +203,25 @@ public class InflightReadsLimiterTest {
}
+ private Pair<OpenTelemetrySdk, InMemoryMetricReader>
buildOpenTelemetryAndReader() {
+ var metricReader = InMemoryMetricReader.create();
+ var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
+ .addMeterProviderCustomizer((builder, __) ->
builder.registerMetricReader(metricReader))
+ .build()
+ .getOpenTelemetrySdk();
+ return Pair.of(openTelemetry, metricReader);
+ }
+
+ private void assertLimiterMetrics(InMemoryMetricReader metricReader,
+ long expectedLimit, long expectedUsed,
long expectedFree) {
+ var metrics = metricReader.collectAllMetrics();
+ assertThat(metrics).anySatisfy(metricData -> assertThat(metricData)
+
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_LIMIT_METRIC_NAME)
+ .hasLongSumSatisfying(longSum ->
longSum.hasPointsSatisfying(point -> point.hasValue(expectedLimit))));
+ assertThat(metrics).anySatisfy(metricData -> assertThat(metricData)
+
.hasName(InflightReadsLimiter.INFLIGHT_READS_LIMITER_USAGE_METRIC_NAME)
+ .hasLongSumSatisfying(longSum -> longSum.hasPointsSatisfying(
+ point ->
point.hasValue(expectedFree).hasAttributes(FREE.attributes),
+ point ->
point.hasValue(expectedUsed).hasAttributes(USED.attributes))));
+ }
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
index 6f573ff8d75..01976f648ab 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java
@@ -18,8 +18,24 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertSame;
+import io.opentelemetry.api.OpenTelemetry;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
@@ -30,7 +46,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterClass;
@@ -38,23 +53,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.AssertJUnit.assertNotSame;
-import static org.testng.AssertJUnit.assertSame;
-
@Slf4j
public class PendingReadsManagerTest {
@@ -93,7 +91,7 @@ public class PendingReadsManagerTest {
config.setReadEntryTimeoutSeconds(10000);
when(rangeEntryCache.getName()).thenReturn("my-topic");
when(rangeEntryCache.getManagedLedgerConfig()).thenReturn(config);
- inflighReadsLimiter = new InflightReadsLimiter(0);
+ inflighReadsLimiter = new InflightReadsLimiter(0,
OpenTelemetry.noop());
when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter);
pendingReadsManager = new PendingReadsManager(rangeEntryCache);
doAnswer(new Answer() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c0f44838ac6..5ea055287eb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -40,7 +40,9 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.io.Closeable;
import java.io.IOException;
@@ -182,6 +184,7 @@ import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionRateLimitOperationName;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -258,6 +261,15 @@ public class BrokerService implements Closeable {
private final ObservableLongUpDownCounter
pendingTopicLoadOperationsCounter;
private final ObservableLongUpDownCounter
pendingTopicLoadOperationsLimitCounter;
+ public static final String CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME =
"pulsar.broker.connection.rate_limit.count";
+ private final LongCounter rateLimitedConnectionsCounter;
+ @PulsarDeprecatedMetric(newMetricName =
CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+ @Deprecated
+ private static final Gauge throttledConnectionsGauge = Gauge.build()
+ .name("pulsar_broker_throttled_connections")
+ .help("Counter of connections throttled because of per-connection
limit")
+ .register();
+
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
@@ -301,7 +313,6 @@ public class BrokerService implements Closeable {
private Channel listenChannelTls;
private boolean preciseTopicPublishRateLimitingEnable;
- private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;
private final EntryFilterProvider entryFilterProvider;
private TopicFactory topicFactory;
@@ -456,6 +467,12 @@ public class BrokerService implements Closeable {
.buildWithCallback(
measurement ->
measurement.record(pulsar.getConfig().getMaxConcurrentTopicLoadRequest()));
+ this.rateLimitedConnectionsCounter =
pulsar.getOpenTelemetry().getMeter()
+
.counterBuilder(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+ .setDescription("The number of times a connection has been
rate limited.")
+ .setUnit("{operation}")
+ .build();
+
this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
BrokerService.class.getClassLoader());
@@ -3701,16 +3718,22 @@ public class BrokerService implements Closeable {
return !brokerEntryPayloadProcessors.isEmpty();
}
- public void pausedConnections(int numberOfConnections) {
- pausedConnections.add(numberOfConnections);
+ public void recordConnectionPaused() {
+ rateLimitedConnectionsCounter.add(1,
ConnectionRateLimitOperationName.PAUSED.attributes);
+ }
+
+ public void recordConnectionResumed() {
+ rateLimitedConnectionsCounter.add(1,
ConnectionRateLimitOperationName.RESUMED.attributes);
}
- public void resumedConnections(int numberOfConnections) {
- pausedConnections.add(-numberOfConnections);
+ public void recordConnectionThrottled() {
+ rateLimitedConnectionsCounter.add(1,
ConnectionRateLimitOperationName.THROTTLED.attributes);
+ throttledConnectionsGauge.inc();
}
- public long getPausedConnections() {
- return pausedConnections.longValue();
+ public void recordConnectionUnthrottled() {
+ rateLimitedConnectionsCounter.add(1,
ConnectionRateLimitOperationName.UNTHROTTLED.attributes);
+ throttledConnectionsGauge.dec();
}
@SuppressWarnings("unchecked")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 260552c55c0..6690ab4af5f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -298,8 +298,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Start, Connected, Failed, Connecting
}
- private final ServerCnxThrottleTracker throttleTracker = new
ServerCnxThrottleTracker(this);
-
+ private final ServerCnxThrottleTracker throttleTracker;
public ServerCnx(PulsarService pulsar) {
this(pulsar, null);
@@ -348,6 +347,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.topicListService = new TopicListService(pulsar, this,
enableSubscriptionPatternEvaluation,
maxSubscriptionPatternLength);
this.brokerInterceptor = this.service != null ?
this.service.getInterceptor() : null;
+ this.throttleTracker = new ServerCnxThrottleTracker(this);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index 7e55397022d..78bac024218 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service;
-import io.prometheus.client.Gauge;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.extern.slf4j.Slf4j;
@@ -38,10 +37,6 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
final class ServerCnxThrottleTracker {
- private static final Gauge throttledConnections = Gauge.build()
- .name("pulsar_broker_throttled_connections")
- .help("Counter of connections throttled because of per-connection
limit")
- .register();
private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
THROTTLE_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(
@@ -59,6 +54,7 @@ final class ServerCnxThrottleTracker {
private volatile int pendingSendRequestsExceeded;
private volatile int publishBufferLimiting;
+
public ServerCnxThrottleTracker(ServerCnx serverCnx) {
this.serverCnx = serverCnx;
@@ -94,10 +90,10 @@ final class ServerCnxThrottleTracker {
}
// update the metrics that track throttling
if (autoRead) {
- serverCnx.getBrokerService().resumedConnections(1);
+ serverCnx.getBrokerService().recordConnectionResumed();
} else if (isChannelActive()) {
serverCnx.increasePublishLimitedTimesForTopics();
- serverCnx.getBrokerService().pausedConnections(1);
+ serverCnx.getBrokerService().recordConnectionPaused();
}
}
@@ -114,9 +110,9 @@ final class ServerCnxThrottleTracker {
if (changed) {
// update the metrics that track throttling due to pending send
requests
if (throttlingEnabled) {
- throttledConnections.inc();
+ serverCnx.getBrokerService().recordConnectionThrottled();
} else {
- throttledConnections.dec();
+ serverCnx.getBrokerService().recordConnectionUnthrottled();
}
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 27f72eac942..0faae14da08 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -18,12 +18,18 @@
*/
package org.apache.pulsar.broker.service;
-import static org.testng.Assert.assertEquals;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.testng.Assert.fail;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionRateLimitOperationName;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -43,6 +49,12 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
@Test
public void testMessagePublishBufferThrottleDisabled() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(-1);
@@ -52,7 +64,8 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
.topic(topic)
.producerName("producer-name")
.create();
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
pulsarTestContext.getMockBookKeeper().addEntryDelay(1,
TimeUnit.SECONDS);
@@ -63,7 +76,8 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
}
producer.flush();
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
}
@Test
@@ -71,14 +85,14 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
conf.setMaxMessagePublishBufferSizeInMB(1);
super.baseSetup();
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
final String topic =
"persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer-name")
.create();
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
pulsarTestContext.getMockBookKeeper().addEntryDelay(1,
TimeUnit.SECONDS);
@@ -87,23 +101,27 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
producer.sendAsync(payload);
}
- Awaitility.await().untilAsserted(
- () ->
Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 1L));
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+ Awaitility.await().untilAsserted(() -> {
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED,
0);
+ });
producer.flush();
- Awaitility.await().untilAsserted(
- () ->
Assert.assertEquals(pulsar.getBrokerService().getPausedConnections(), 0L));
-
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+ Awaitility.await().untilAsserted(() -> {
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED,
0);
+ });
}
@Test
public void testBlockByPublishRateLimiting() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(1);
super.baseSetup();
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
+
final String topic =
"persistent://prop/ns-abc/testBlockByPublishRateLimiting";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
@@ -111,7 +129,8 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
.create();
Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
pulsarTestContext.getMockBookKeeper().addEntryDelay(5,
TimeUnit.SECONDS);
@@ -121,13 +140,15 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
producer.sendAsync(payload);
}
- Awaitility.await().untilAsserted(() ->
assertEquals(pulsar.getBrokerService().getPausedConnections(), 1));
+ Awaitility.await().untilAsserted(() ->
assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1));
CompletableFuture<Void> flushFuture = producer.flushAsync();
// Block by publish rate.
// After 1 second, the message buffer throttling will be lifted, but
the rate limiting will still be in place.
- assertEquals(pulsar.getBrokerService().getPausedConnections(), 1);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED, 1);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED, 0);
+
try {
flushFuture.get(2, TimeUnit.SECONDS);
fail("Should have timed out");
@@ -137,7 +158,52 @@ public class MessagePublishBufferThrottleTest extends
BrokerTestBase {
flushFuture.join();
- Awaitility.await().untilAsserted(() ->
- assertEquals(pulsar.getBrokerService().getPausedConnections(),
0));
+ Awaitility.await().untilAsserted(() -> {
+ assertRateLimitCounter(ConnectionRateLimitOperationName.PAUSED,
10);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.RESUMED,
10);
+ });
+ }
+
+ @Test
+ public void testConnectionThrottled() throws Exception {
+ super.baseSetup();
+
+ var topic =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testSendThrottled");
+
+ assertRateLimitCounter(ConnectionRateLimitOperationName.THROTTLED, 0);
+ assertRateLimitCounter(ConnectionRateLimitOperationName.UNTHROTTLED,
0);
+
+ @Cleanup
+ var producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(false)
+ .topic(topic)
+ .create();
+ final int messages = 2000;
+ for (int i = 0; i < messages; i++) {
+ producer.sendAsync("Message - " + i);
+ }
+ producer.flush();
+
+ // Wait for the connection to be throttled and unthrottled.
+ Awaitility.await().untilAsserted(() -> {
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+ ConnectionRateLimitOperationName.THROTTLED.attributes,
value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics,
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+ ConnectionRateLimitOperationName.UNTHROTTLED.attributes,
value -> assertThat(value).isPositive());
+ });
+ }
+
+ private void assertRateLimitCounter(ConnectionRateLimitOperationName
connectionRateLimitState, int expectedCount) {
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ if (expectedCount == 0) {
+ assertThat(metrics).noneSatisfy(metricData ->
assertThat(metricData)
+
.hasName(BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME)
+ .hasLongSumSatisfying(sum -> sum.hasPointsSatisfying(
+ points ->
points.hasAttributes(connectionRateLimitState.attributes))));
+ } else {
+ assertMetricLongSumValue(metrics,
BrokerService.CONNECTION_RATE_LIMIT_COUNT_METRIC_NAME,
+ connectionRateLimitState.attributes, expectedCount);
+ }
}
}
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 41358a72c0d..6eb84e94bc6 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -112,6 +112,17 @@ public interface OpenTelemetryAttributes {
*/
AttributeKey<String> PULSAR_CLIENT_VERSION =
AttributeKey.stringKey("pulsar.client.version");
+ AttributeKey<String> PULSAR_CONNECTION_RATE_LIMIT_OPERATION_NAME =
+
AttributeKey.stringKey("pulsar.connection.rate_limit.operation.name");
+ enum ConnectionRateLimitOperationName {
+ PAUSED,
+ RESUMED,
+ THROTTLED,
+ UNTHROTTLED;
+ public final Attributes attributes =
+ Attributes.of(PULSAR_CONNECTION_RATE_LIMIT_OPERATION_NAME,
name().toLowerCase());
+ }
+
/**
* The status of the Pulsar transaction.
*/
@@ -197,6 +208,14 @@ public interface OpenTelemetryAttributes {
public final Attributes attributes =
Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase());
}
+ AttributeKey<String> MANAGED_LEDGER_READ_INFLIGHT_USAGE =
+
AttributeKey.stringKey("pulsar.managed_ledger.inflight.read.usage.state");
+ enum InflightReadLimiterUtilization {
+ USED,
+ FREE;
+ public final Attributes attributes =
Attributes.of(MANAGED_LEDGER_READ_INFLIGHT_USAGE, name().toLowerCase());
+ }
+
/**
* The name of the remote cluster for a Pulsar replicator.
*/