This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 59136a0ffa0 [feat][misc] PIP-264: Add OpenTelemetry HTTP rate limiting 
filter metric (#23042)
59136a0ffa0 is described below

commit 59136a0ffa0b833411b8af4b7ef9b9c7eb74f909
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Jul 17 10:06:39 2024 -0700

    [feat][misc] PIP-264: Add OpenTelemetry HTTP rate limiting filter metric 
(#23042)
---
 .../pulsar/broker/web/RateLimitingFilter.java      | 27 ++++++++++++++++++++--
 .../org/apache/pulsar/broker/web/WebService.java   |  3 ++-
 .../apache/pulsar/broker/web/WebServiceTest.java   | 26 ++++++++++++++++++++-
 .../worker/PulsarWorkerOpenTelemetry.java          |  4 +++-
 .../pulsar/functions/worker/rest/WorkerServer.java |  5 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  5 +++-
 .../proxy/stats/PulsarProxyOpenTelemetry.java      |  4 +++-
 7 files changed, 66 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
index 502b691fa34..0618df6609c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/RateLimitingFilter.java
@@ -19,6 +19,10 @@
 package org.apache.pulsar.broker.web;
 
 import com.google.common.util.concurrent.RateLimiter;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
 import io.prometheus.client.Counter;
 import java.io.IOException;
 import javax.servlet.Filter;
@@ -33,15 +37,32 @@ public class RateLimitingFilter implements Filter {
 
     private final RateLimiter limiter;
 
-    public RateLimitingFilter(double rateLimit) {
-        limiter = RateLimiter.create(rateLimit);
+    public static final String RATE_LIMIT_REQUEST_COUNT_METRIC_NAME =
+            "pulsar.web.filter.rate_limit.request.count";
+    private final LongCounter rateLimitRequestCounter;
+
+    public static final AttributeKey<String> RATE_LIMIT_RESULT =
+            AttributeKey.stringKey("pulsar.web.filter.rate_limit.result");
+    public enum Result {
+        ACCEPTED,
+        REJECTED;
+        public final Attributes attributes = Attributes.of(RATE_LIMIT_RESULT, 
name().toLowerCase());
     }
 
+    @Deprecated
     private static final Counter httpRejectedRequests = Counter.build()
             .name("pulsar_broker_http_rejected_requests")
             .help("Counter of HTTP requests rejected by rate limiting")
             .register();
 
+    public RateLimitingFilter(double rateLimit, Meter meter) {
+        limiter = RateLimiter.create(rateLimit);
+        rateLimitRequestCounter = 
meter.counterBuilder(RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+                .setDescription("Counter of HTTP requests processed by the 
rate limiting filter.")
+                .setUnit("{request}")
+                .build();
+    }
+
     @Override
     public void init(FilterConfig filterConfig) throws ServletException {
     }
@@ -50,9 +71,11 @@ public class RateLimitingFilter implements Filter {
     public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain)
             throws IOException, ServletException {
         if (limiter.tryAcquire()) {
+            rateLimitRequestCounter.add(1, Result.ACCEPTED.attributes);
             chain.doFilter(request, response);
         } else {
             httpRejectedRequests.inc();
+            rateLimitRequestCounter.add(1, Result.REJECTED.attributes);
             HttpServletResponse httpResponse = (HttpServletResponse) response;
             httpResponse.sendError(429, "Too Many Requests");
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index c969f40ad43..d95e88661ae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -250,7 +250,8 @@ public class WebService implements AutoCloseable {
 
             if (config.isHttpRequestsLimitEnabled()) {
                 filterHolders.add(new FilterHolder(
-                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+                                pulsarService.getOpenTelemetry().getMeter())));
             }
 
             // wait until the PulsarService is ready to serve incoming requests
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 30644237a74..08041d72c7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pulsar.broker.web;
 
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -61,6 +61,7 @@ import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.web.RateLimitingFilter.Result;
 import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
 import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -270,12 +271,29 @@ public class WebServiceTest {
     public void testRateLimiting() throws Exception {
         setupEnv(false, false, false, false, 10.0, false);
 
+        // setupEnv makes a HTTP call to create the cluster.
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+                Result.ACCEPTED.attributes, 1);
+        assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
+                
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+                .hasLongSumSatisfying(
+                        sum -> sum.hasPointsSatisfying(point -> 
point.hasAttributes(Result.REJECTED.attributes))));
+
         // Make requests without exceeding the max rate
         for (int i = 0; i < 5; i++) {
             makeHttpRequest(false, false);
             Thread.sleep(200);
         }
 
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+                Result.ACCEPTED.attributes, 6);
+        assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
+                
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
+                .hasLongSumSatisfying(
+                        sum -> sum.hasPointsSatisfying(point -> 
point.hasAttributes(Result.REJECTED.attributes))));
+
         try {
             for (int i = 0; i < 500; i++) {
                 makeHttpRequest(false, false);
@@ -285,6 +303,12 @@ public class WebServiceTest {
         } catch (IOException e) {
             assertTrue(e.getMessage().contains("429"));
         }
+
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+                Result.ACCEPTED.attributes, value -> 
assertThat(value).isGreaterThan(6));
+        assertMetricLongSumValue(metrics, 
RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
+                Result.REJECTED.attributes, value -> 
assertThat(value).isPositive());
     }
 
     @Test
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
index be7c15dfd85..6673a89659a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
@@ -27,6 +27,8 @@ import org.apache.pulsar.opentelemetry.OpenTelemetryService;
 public class PulsarWorkerOpenTelemetry implements Closeable {
 
     public static final String SERVICE_NAME = "pulsar-function-worker";
+    public static final String INSTRUMENTATION_SCOPE_NAME = 
"org.apache.pulsar.function_worker";
+
     private final OpenTelemetryService openTelemetryService;
 
     @Getter
@@ -38,7 +40,7 @@ public class PulsarWorkerOpenTelemetry implements Closeable {
                 .serviceName(SERVICE_NAME)
                 .serviceVersion(PulsarVersion.getVersion())
                 .build();
-        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 583d8ce558b..1d8c66a57df 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import io.opentelemetry.api.OpenTelemetry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -30,6 +31,7 @@ import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.broker.web.JettyRequestLogFactory;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
+import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
@@ -219,7 +221,8 @@ public class WorkerServer {
 
             if (config.isHttpRequestsLimitEnabled()) {
                 filterHolders.add(new FilterHolder(
-                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+                                
OpenTelemetry.noop().getMeter(PulsarWorkerOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
             }
 
             if (config.isAuthenticationEnabled()) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
index 478b911eb23..ad94f1b65a0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.proxy.server;
 
 import static 
org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
+import io.opentelemetry.api.OpenTelemetry;
 import io.prometheus.client.jetty.JettyStatisticsCollector;
 import java.io.IOException;
 import java.net.URI;
@@ -37,6 +38,7 @@ import org.apache.pulsar.broker.web.JsonMapperProvider;
 import org.apache.pulsar.broker.web.RateLimitingFilter;
 import org.apache.pulsar.broker.web.WebExecutorThreadPool;
 import org.apache.pulsar.jetty.tls.JettySslContextFactory;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
 import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.ConnectionLimit;
 import org.eclipse.jetty.server.Connector;
@@ -191,7 +193,8 @@ public class WebServer {
 
             if (config.isHttpRequestsLimitEnabled()) {
                 filterHolders.add(new FilterHolder(
-                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
+                        new 
RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
+                                
OpenTelemetry.noop().getMeter(PulsarProxyOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
             }
 
             if (config.isAuthenticationEnabled()) {
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
index 14bbc649466..2748e2c3df5 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
@@ -28,6 +28,8 @@ import org.apache.pulsar.proxy.server.ProxyConfiguration;
 public class PulsarProxyOpenTelemetry implements Closeable {
 
     public static final String SERVICE_NAME = "pulsar-proxy";
+    public static final String INSTRUMENTATION_SCOPE_NAME = 
"org.apache.pulsar.proxy";
+
     private final OpenTelemetryService openTelemetryService;
 
     @Getter
@@ -39,7 +41,7 @@ public class PulsarProxyOpenTelemetry implements Closeable {
                 .serviceName(SERVICE_NAME)
                 .serviceVersion(PulsarVersion.getVersion())
                 .build();
-        meter = 
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
+        meter = 
openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
     }
 
     @Override

Reply via email to