This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 59136a0ffa0 [feat][misc] PIP-264: Add OpenTelemetry HTTP rate limiting
filter metric (#23042)
59136a0ffa0 is described below
commit 59136a0ffa0b833411b8af4b7ef9b9c7eb74f909
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Jul 17 10:06:39 2024 -0700
[feat][misc] PIP-264: Add OpenTelemetry HTTP rate limiting filter metric
(#23042)
---
.../pulsar/broker/web/RateLimitingFilter.java | 27 ++++++++++++++++++++--
.../org/apache/pulsar/broker/web/WebService.java | 3 ++-
.../apache/pulsar/broker/web/WebServiceTest.java | 26 ++++++++++++++++++++-
.../worker/PulsarWorkerOpenTelemetry.java | 4 +++-
.../pulsar/functions/worker/rest/WorkerServer.java | 5 +++-
.../org/apache/pulsar/proxy/server/WebServer.java | 5 +++-
.../proxy/stats/PulsarProxyOpenTelemetry.java | 4 +++-
7 files changed, 66 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
index 502b691fa34..0618df6609c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
@@ -19,6 +19,10 @@
package org.apache.pulsar.broker.web;
import com.google.common.util.concurrent.RateLimiter;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
import io.prometheus.client.Counter;
import java.io.IOException;
import javax.servlet.Filter;
@@ -33,15 +37,32 @@ public class RateLimitingFilter implements Filter {
private final RateLimiter limiter;
- public RateLimitingFilter(double rateLimit) {
- limiter = RateLimiter.create(rateLimit);
+ public static final String RATE_LIMIT_REQUEST_COUNT_METRIC_NAME =
+ "pulsar.web.filter.rate_limit.request.count";
+ private final LongCounter rateLimitRequestCounter;
+
+ public static final AttributeKey<String> RATE_LIMIT_RESULT =
+ AttributeKey.stringKey("pulsar.web.filter.rate_limit.result");
+ public enum Result {
+ ACCEPTED,
+ REJECTED;
+ public final Attributes attributes = Attributes.of(RATE_LIMIT_RESULT,
name().toLowerCase());
}
+ @Deprecated
private static final Counter httpRejectedRequests = Counter.build()
.name("pulsar_broker_http_rejected_requests")
.help("Counter of HTTP requests rejected by rate limiting")
.register();
+ public RateLimitingFilter(double rateLimit, Meter meter) {
+ limiter = RateLimiter.create(rateLimit);
+ rateLimitRequestCounter =
meter.counterBuilder(RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+ .setDescription("Counter of HTTP requests processed by the
rate limiting filter.")
+ .setUnit("{request}")
+ .build();
+ }
+
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@@ -50,9 +71,11 @@ public class RateLimitingFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain)
throws IOException, ServletException {
if (limiter.tryAcquire()) {
+ rateLimitRequestCounter.add(1, Result.ACCEPTED.attributes);
chain.doFilter(request, response);
} else {
httpRejectedRequests.inc();
+ rateLimitRequestCounter.add(1, Result.REJECTED.attributes);
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(429, "Too Many Requests");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index c969f40ad43..d95e88661ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -250,7 +250,8 @@ public class WebService implements AutoCloseable {
if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
- new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+ pulsarService.getOpenTelemetry().getMeter())));
}
// wait until the PulsarService is ready to serve incoming requests
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 30644237a74..08041d72c7e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.web;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -61,6 +61,7 @@ import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.web.RateLimitingFilter.Result;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -270,12 +271,29 @@ public class WebServiceTest {
public void testRateLimiting() throws Exception {
setupEnv(false, false, false, false, 10.0, false);
+ // setupEnv makes a HTTP call to create the cluster.
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+ Result.ACCEPTED.attributes, 1);
+ assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
+
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(point ->
point.hasAttributes(Result.REJECTED.attributes))));
+
// Make requests without exceeding the max rate
for (int i = 0; i < 5; i++) {
makeHttpRequest(false, false);
Thread.sleep(200);
}
+ metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+ Result.ACCEPTED.attributes, 6);
+ assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
+
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(point ->
point.hasAttributes(Result.REJECTED.attributes))));
+
try {
for (int i = 0; i < 500; i++) {
makeHttpRequest(false, false);
@@ -285,6 +303,12 @@ public class WebServiceTest {
} catch (IOException e) {
assertTrue(e.getMessage().contains("429"));
}
+
+ metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+ Result.ACCEPTED.attributes, value ->
assertThat(value).isGreaterThan(6));
+ assertMetricLongSumValue(metrics,
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+ Result.REJECTED.attributes, value ->
assertThat(value).isPositive());
}
@Test
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
index be7c15dfd85..6673a89659a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.opentelemetry.OpenTelemetryService;
public class PulsarWorkerOpenTelemetry implements Closeable {
public static final String SERVICE_NAME = "pulsar-function-worker";
+ public static final String INSTRUMENTATION_SCOPE_NAME =
"org.apache.pulsar.function_worker";
+
private final OpenTelemetryService openTelemetryService;
@Getter
@@ -38,7 +40,7 @@ public class PulsarWorkerOpenTelemetry implements Closeable {
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
- meter =
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
+ meter =
openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
}
@Override
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 583d8ce558b..1d8c66a57df 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.rest;
+import io.opentelemetry.api.OpenTelemetry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -30,6 +31,7 @@ import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
+import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
@@ -219,7 +221,8 @@ public class WorkerServer {
if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
- new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+
OpenTelemetry.noop().getMeter(PulsarWorkerOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
}
if (config.isAuthenticationEnabled()) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 478b911eb23..ad94f1b65a0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;
import static
org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
+import io.opentelemetry.api.OpenTelemetry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
@@ -37,6 +38,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
@@ -191,7 +193,8 @@ public class WebServer {
if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
- new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+ new
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+
OpenTelemetry.noop().getMeter(PulsarProxyOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
}
if (config.isAuthenticationEnabled()) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
index 14bbc649466..2748e2c3df5 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.proxy.server.ProxyConfiguration;
public class PulsarProxyOpenTelemetry implements Closeable {
public static final String SERVICE_NAME = "pulsar-proxy";
+ public static final String INSTRUMENTATION_SCOPE_NAME =
"org.apache.pulsar.proxy";
+
private final OpenTelemetryService openTelemetryService;
@Getter
@@ -39,7 +41,7 @@ public class PulsarProxyOpenTelemetry implements Closeable {
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
- meter =
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
+ meter =
openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
}
@Override