This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4341f0f301e [feat][broker] PIP-264: Add broker web executor metrics
(#22816)
4341f0f301e is described below
commit 4341f0f301e0da344bb5ce07bc62c373e7ce48ef
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Jun 5 16:34:56 2024 -0700
[feat][broker] PIP-264: Add broker web executor metrics (#22816)
---
.../broker/web/WebExecutorThreadPoolStats.java | 83 ++++++++++++++++++++++
.../apache/pulsar/broker/web/WebExecutorStats.java | 7 ++
.../org/apache/pulsar/broker/web/WebService.java | 5 ++
.../apache/pulsar/broker/web/WebServiceTest.java | 18 +++++
4 files changed, 113 insertions(+)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
new file mode 100644
index 00000000000..6bfe4e33b8e
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPoolStats.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+
+public class WebExecutorThreadPoolStats implements AutoCloseable {
+ // Replaces ['pulsar_web_executor_max_threads',
'pulsar_web_executor_min_threads']
+ public static final String LIMIT_COUNTER =
"pulsar.web.executor.thread.limit";
+ private final ObservableLongUpDownCounter limitCounter;
+
+ // Replaces
+ // ['pulsar_web_executor_active_threads',
'pulsar_web_executor_current_threads', 'pulsar_web_executor_idle_threads']
+ public static final String USAGE_COUNTER =
"pulsar.web.executor.thread.usage";
+ private final ObservableLongUpDownCounter usageCounter;
+
+ public static final AttributeKey<String> LIMIT_TYPE_KEY =
+ AttributeKey.stringKey("pulsar.web.executor.thread.limit.type");
+ @VisibleForTesting
+ enum LimitType {
+ MAX,
+ MIN;
+ public final Attributes attributes = Attributes.of(LIMIT_TYPE_KEY,
name().toLowerCase());
+ }
+
+ public static final AttributeKey<String> USAGE_TYPE_KEY =
+ AttributeKey.stringKey("pulsar.web.executor.thread.usage.type");
+ @VisibleForTesting
+ enum UsageType {
+ ACTIVE,
+ CURRENT,
+ IDLE;
+ public final Attributes attributes = Attributes.of(USAGE_TYPE_KEY,
name().toLowerCase());
+ }
+
+ public WebExecutorThreadPoolStats(Meter meter, WebExecutorThreadPool
executor) {
+ limitCounter = meter
+ .upDownCounterBuilder(LIMIT_COUNTER)
+ .setUnit("{thread}")
+ .setDescription("The thread limits for the pulsar-web executor
pool.")
+ .buildWithCallback(measurement -> {
+ measurement.record(executor.getMaxThreads(),
LimitType.MAX.attributes);
+ measurement.record(executor.getMinThreads(),
LimitType.MIN.attributes);
+ });
+ usageCounter = meter
+ .upDownCounterBuilder(USAGE_COUNTER)
+ .setUnit("{thread}")
+ .setDescription("The current usage of threads in the
pulsar-web executor pool.")
+ .buildWithCallback(measurement -> {
+ var idleThreads = executor.getIdleThreads();
+ var currentThreads = executor.getThreads();
+ measurement.record(idleThreads, UsageType.IDLE.attributes);
+ measurement.record(currentThreads,
UsageType.CURRENT.attributes);
+ measurement.record(currentThreads - idleThreads,
UsageType.ACTIVE.attributes);
+ });
+ }
+
+ @Override
+ public synchronized void close() {
+ limitCounter.close();
+ usageCounter.close();
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
index 585df813027..28cfa7430cb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebExecutorStats.java
@@ -21,14 +21,21 @@ package org.apache.pulsar.broker.web;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
+@Deprecated
class WebExecutorStats implements AutoCloseable {
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
+ @PulsarDeprecatedMetric(newMetricName =
WebExecutorThreadPoolStats.LIMIT_COUNTER)
private final Gauge maxThreads;
+ @PulsarDeprecatedMetric(newMetricName =
WebExecutorThreadPoolStats.LIMIT_COUNTER)
private final Gauge minThreads;
+ @PulsarDeprecatedMetric(newMetricName =
WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge idleThreads;
+ @PulsarDeprecatedMetric(newMetricName =
WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge activeThreads;
+ @PulsarDeprecatedMetric(newMetricName =
WebExecutorThreadPoolStats.USAGE_COUNTER)
private final Gauge currentThreads;
private final WebExecutorThreadPool executor;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 9a439268a8b..bf484d4f41f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -75,7 +75,9 @@ public class WebService implements AutoCloseable {
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
+ @Deprecated
private final WebExecutorStats executorStats;
+ private final WebExecutorThreadPoolStats webExecutorThreadPoolStats;
private final WebExecutorThreadPool webServiceExecutor;
private final ServerConnector httpConnector;
@@ -101,6 +103,8 @@ public class WebService implements AutoCloseable {
"pulsar-web",
config.getHttpServerThreadPoolQueueSize());
this.executorStats = WebExecutorStats.getStats(webServiceExecutor);
+ this.webExecutorThreadPoolStats =
+ new
WebExecutorThreadPoolStats(pulsar.getOpenTelemetry().getMeter(),
webServiceExecutor);
this.server = new Server(webServiceExecutor);
if (config.getMaxHttpServerConnections() > 0) {
server.addBean(new
ConnectionLimit(config.getMaxHttpServerConnections(), server));
@@ -376,6 +380,7 @@ public class WebService implements AutoCloseable {
jettyStatisticsCollector = null;
}
webServiceExecutor.join();
+ webExecutorThreadPoolStats.close();
this.executorStats.close();
log.info("Web service closed");
} catch (Exception e) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 17588a7ecac..30644237a74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.broker.web;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -59,6 +61,8 @@ import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
+import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -106,6 +110,19 @@ public class WebServiceTest {
@Test
public void testWebExecutorMetrics() throws Exception {
setupEnv(true, false, false, false, -1, false);
+
+ var otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MAX.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
WebExecutorThreadPoolStats.LIMIT_COUNTER, LimitType.MIN.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.ACTIVE.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(otelMetrics,
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.CURRENT.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
WebExecutorThreadPoolStats.USAGE_COUNTER, UsageType.IDLE.attributes,
+ value -> assertThat(value).isNotNegative());
+
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, false, false, false,
statsOut);
String metricsStr = statsOut.toString();
@@ -498,6 +515,7 @@ public class WebServiceTest {
pulsarTestContext = PulsarTestContext.builder()
.spyByDefault()
.config(config)
+ .enableOpenTelemetry(true)
.build();
pulsar = pulsarTestContext.getPulsarService();