This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8957e353ded [feat][misc] PIP-320: Add OpenTelemetry scaffolding
(#22010)
8957e353ded is described below
commit 8957e353ded9ee24eccea349c7747da721d9e66a
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Feb 9 15:40:20 2024 -0800
[feat][misc] PIP-320: Add OpenTelemetry scaffolding (#22010)
Co-authored-by: Matteo Merli <[email protected]>
---
.github/workflows/pulsar-ci.yaml | 3 +
build/run_integration_group.sh | 4 +
distribution/server/src/assemble/LICENSE.bin.txt | 28 +++
pom.xml | 31 ++++
.../ProxySaslAuthenticationTest.java | 1 +
pulsar-broker-common/pom.xml | 18 ++
.../stats/prometheus/PrometheusMetricsClient.java | 0
pulsar-broker/pom.xml | 14 ++
.../org/apache/pulsar/broker/PulsarService.java | 6 +
.../broker/stats/PulsarBrokerOpenTelemetry.java | 49 +++++
pulsar-functions/worker/pom.xml | 6 +
.../worker/PulsarWorkerOpenTelemetry.java | 48 +++++
.../functions/worker/PulsarWorkerService.java | 6 +
.../worker/FunctionAssignmentTailerTest.java | 5 +
.../pom.xml | 88 ++++-----
.../opentelemetry/OpenTelemetryAttributes.java | 32 ++++
.../pulsar/opentelemetry/OpenTelemetryService.java | 108 +++++++++++
.../apache/pulsar/opentelemetry/package-info.java | 24 +++
.../opentelemetry/OpenTelemetryServiceTest.java | 201 +++++++++++++++++++++
pulsar-proxy/pom.xml | 6 +
.../apache/pulsar/proxy/server/ProxyService.java | 7 +
.../proxy/stats/PulsarProxyOpenTelemetry.java | 49 +++++
.../extensions/SimpleProxyExtensionTestBase.java | 1 +
.../server/AdminProxyHandlerKeystoreTLSTest.java | 1 +
.../proxy/server/AuthedAdminProxyHandlerTest.java | 1 +
.../proxy/server/ProxyAdditionalServletTest.java | 1 +
.../ProxyAuthenticatedProducerConsumerTest.java | 1 +
.../proxy/server/ProxyAuthenticationTest.java | 4 +-
.../server/ProxyConnectionThrottlingTest.java | 1 +
.../server/ProxyEnableHAProxyProtocolTest.java | 1 +
.../proxy/server/ProxyForwardAuthDataTest.java | 4 +-
.../server/ProxyKeyStoreTlsTransportTest.java | 1 +
.../proxy/server/ProxyKeyStoreTlsWithAuthTest.java | 1 +
.../server/ProxyKeyStoreTlsWithoutAuthTest.java | 1 +
.../proxy/server/ProxyLookupThrottlingTest.java | 1 +
.../pulsar/proxy/server/ProxyMutualTlsTest.java | 1 +
.../pulsar/proxy/server/ProxyParserTest.java | 1 +
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 4 +-
.../proxy/server/ProxyRolesEnforcementTest.java | 4 +-
.../ProxyServiceStarterDisableZeroCopyTest.java | 3 +-
.../proxy/server/ProxyServiceStarterTest.java | 1 +
.../proxy/server/ProxyServiceTlsStarterTest.java | 1 +
.../apache/pulsar/proxy/server/ProxyStatsTest.java | 1 +
.../proxy/server/ProxyStuckConnectionTest.java | 1 +
.../org/apache/pulsar/proxy/server/ProxyTest.java | 1 +
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 1 +
.../pulsar/proxy/server/ProxyTlsWithAuthTest.java | 1 +
.../server/ProxyWithAuthorizationNegTest.java | 4 +-
.../proxy/server/ProxyWithAuthorizationTest.java | 5 +-
.../server/ProxyWithExtensibleLoadManagerTest.java | 1 +
.../server/ProxyWithJwtAuthorizationTest.java | 4 +-
.../server/ProxyWithoutServiceDiscoveryTest.java | 5 +-
.../SuperUserAuthedAdminProxyHandlerTest.java | 1 +
.../server/UnauthedAdminProxyHandlerTest.java | 1 +
tests/integration/pom.xml | 20 +-
.../OpenTelemetryCollectorContainer.java | 63 +++++++
.../integration/containers/PulsarContainer.java | 2 +
.../metrics/OpenTelemetrySanityTest.java | 165 +++++++++++++++++
.../integration/topologies/PulsarCluster.java | 43 +++--
.../integration/topologies/PulsarClusterSpec.java | 17 ++
.../containers/otel-collector-config.yaml | 43 +++++
.../src/test/resources/pulsar-metrics.xml | 28 +++
tests/integration/src/test/resources/pulsar.xml | 1 +
63 files changed, 1100 insertions(+), 76 deletions(-)
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 7767beaa9aa..effeab90beb 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -589,6 +589,9 @@ jobs:
- name: Transaction
group: TRANSACTION
+ - name: Metrics
+ group: METRICS
+
steps:
- name: checkout
uses: actions/checkout@v4
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index f20a7ad0793..2d82fce0887 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -181,6 +181,10 @@ test_group_transaction() {
mvn_run_integration_test "$@"
-DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}
+test_group_metrics() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-metrics.xml
-DintegrationTests
+}
+
test_group_tiered_filesystem() {
mvn_run_integration_test "$@"
-DintegrationTestSuiteFile=tiered-filesystem-storage.xml -DintegrationTests
}
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index e3941c54a74..7c95811faf7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -333,6 +333,13 @@ The Apache Software License, Version 2.0
- io.prometheus-simpleclient_tracer_common-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel-0.16.0.jar
- io.prometheus-simpleclient_tracer_otel_agent-0.16.0.jar
+ * Prometheus exporter
+ - io.prometheus-prometheus-metrics-config-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exporter-common-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exporter-httpserver-1.1.0.jar
+ - io.prometheus-prometheus-metrics-exposition-formats-1.1.0.jar
+ - io.prometheus-prometheus-metrics-model-1.1.0.jar
+ - io.prometheus-prometheus-metrics-shaded-protobuf-1.1.0.jar
* Jakarta Bean Validation API
- jakarta.validation-jakarta.validation-api-2.0.2.jar
- javax.validation-validation-api-1.1.0.Final.jar
@@ -503,6 +510,27 @@ The Apache Software License, Version 2.0
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.44.jar
- org.roaringbitmap-shims-0.9.44.jar
+ * OpenTelemetry
+ - io.opentelemetry-opentelemetry-api-1.34.1.jar
+ - io.opentelemetry-opentelemetry-api-events-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-context-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-otlp-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-otlp-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-exporter-prometheus-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-exporter-sender-okhttp-1.34.1.jar
+ - io.opentelemetry-opentelemetry-extension-incubator-1.34.1-alpha.jar
+ - io.opentelemetry-opentelemetry-sdk-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-common-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-extension-autoconfigure-spi-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-logs-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-metrics-1.34.1.jar
+ - io.opentelemetry-opentelemetry-sdk-trace-1.34.1.jar
+ -
io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-1.32.1.jar
+ -
io.opentelemetry.instrumentation-opentelemetry-instrumentation-api-semconv-1.32.1-alpha.jar
+ - io.opentelemetry.instrumentation-opentelemetry-resources-1.32.1-alpha.jar
+ - io.opentelemetry.semconv-opentelemetry-semconv-1.23.1-alpha.jar
BSD 3-clause "New" or "Revised" License
* Google auth library
diff --git a/pom.xml b/pom.xml
index 4dfeb30821a..52a638ac09f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,10 @@ flexible messaging model and an intuitive client
API.</description>
<disruptor.version>3.4.3</disruptor.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
<netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
+ <opentelemetry.version>1.34.1</opentelemetry.version>
+ <opentelemetry.alpha.version>1.34.1-alpha</opentelemetry.alpha.version>
+
<opentelemetry.instrumentation.version>1.32.1-alpha</opentelemetry.instrumentation.version>
+ <opentelemetry.semconv.version>1.23.1-alpha</opentelemetry.semconv.version>
<!-- test dependencies -->
<testcontainers.version>1.18.3</testcontainers.version>
@@ -1446,6 +1450,31 @@ flexible messaging model and an intuitive client
API.</description>
<version>${restassured.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-bom</artifactId>
+ <version>${opentelemetry.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-bom-alpha</artifactId>
+ <version>${opentelemetry.alpha.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.instrumentation</groupId>
+ <artifactId>opentelemetry-resources</artifactId>
+ <version>${opentelemetry.instrumentation.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.semconv</groupId>
+ <artifactId>opentelemetry-semconv</artifactId>
+ <version>${opentelemetry.semconv.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -2266,6 +2295,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
+ <module>pulsar-opentelemetry</module>
<module>structured-event-log</module>
@@ -2330,6 +2360,7 @@ flexible messaging model and an intuitive client
API.</description>
<module>pulsar-broker-auth-sasl</module>
<module>pulsar-client-auth-sasl</module>
<module>pulsar-config-validation</module>
+ <module>pulsar-opentelemetry</module>
<!-- transaction related modules -->
<module>pulsar-transaction</module>
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
index f6ad76a083b..a27384c9890 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/ProxySaslAuthenticationTest.java
@@ -242,6 +242,7 @@ public class ProxySaslAuthenticationTest extends
ProducerConsumerBase {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setSaslJaasClientAllowedIds(".*" + localHostname +
".*");
proxyConfig.setSaslJaasServerSectionName("PulsarProxy");
+ proxyConfig.setClusterName(configClusterName);
// proxy connect to broker
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index d73dba288a3..8e942c78d5b 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -82,10 +82,28 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
similarity index 100%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
rename to
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsClient.java
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c39de184b05..18da38b43dc 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -143,6 +143,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
@@ -209,6 +215,14 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- functions related dependencies (end) -->
<dependency>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 054411c49f6..3701f354b62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -108,6 +108,7 @@ import
org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import
org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
@@ -248,6 +249,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private final Timer brokerClientSharedTimer;
private MetricsGenerator metricsGenerator;
+ private PulsarBrokerOpenTelemetry openTelemetry;
private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
@@ -461,6 +463,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
resetMetricsServlet();
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
if (this.compactionServiceFactory != null) {
try {
@@ -897,6 +902,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
this.metricsGenerator = new MetricsGenerator(this);
+ this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
new file mode 100644
index 00000000000..4b76b993001
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarBrokerOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-broker";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(config.getClusterName())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter =
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index cd89bacbf9e..bb93eeb98d7 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -46,6 +46,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
new file mode 100644
index 00000000000..be7c15dfd85
--- /dev/null
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerOpenTelemetry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+
+public class PulsarWorkerOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-function-worker";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(workerConfig.getPulsarFunctionsCluster())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter =
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 16cf778e072..9f7d1996e0b 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -108,6 +108,7 @@ public class PulsarWorkerService implements WorkerService {
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
private MetricsGenerator metricsGenerator;
+ private PulsarWorkerOpenTelemetry openTelemetry;
@VisibleForTesting
private URI dlogUri;
private LeaderService leaderService;
@@ -188,6 +189,7 @@ public class PulsarWorkerService implements WorkerService {
this.statsUpdater = Executors
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("worker-stats-updater"));
this.metricsGenerator = new MetricsGenerator(this.statsUpdater,
workerConfig);
+ this.openTelemetry = new PulsarWorkerOpenTelemetry(workerConfig);
this.workerConfig = workerConfig;
this.dlogUri = dlogUri;
this.workerStatsManager = new WorkerStatsManager(workerConfig,
runAsStandalone);
@@ -659,6 +661,10 @@ public class PulsarWorkerService implements WorkerService {
if (null != stateStoreProvider) {
stateStoreProvider.close();
}
+
+ if (null != openTelemetry) {
+ openTelemetry.close();
+ }
}
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
index 022ebd6ba48..c78c68f8923 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailerTest.java
@@ -60,6 +60,8 @@ import org.testng.annotations.Test;
@Slf4j
public class FunctionAssignmentTailerTest {
+ private static final String CLUSTER_NAME = "test-cluster";
+
@Test(timeOut = 10000)
public void testErrorNotifier() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
@@ -71,6 +73,7 @@ public class FunctionAssignmentTailerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -183,6 +186,7 @@ public class FunctionAssignmentTailerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
@@ -307,6 +311,7 @@ public class FunctionAssignmentTailerTest {
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
+ workerConfig.setPulsarFunctionsCluster(CLUSTER_NAME);
Function.FunctionMetaData function1 =
Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
diff --git a/pulsar-broker-common/pom.xml b/pulsar-opentelemetry/pom.xml
similarity index 59%
copy from pulsar-broker-common/pom.xml
copy to pulsar-opentelemetry/pom.xml
index d73dba288a3..82a9658cc9d 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-opentelemetry/pom.xml
@@ -20,8 +20,8 @@
-->
<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
@@ -29,51 +29,58 @@
<version>3.3.0-SNAPSHOT</version>
</parent>
- <artifactId>pulsar-broker-common</artifactId>
- <description>Common classes used in multiple broker modules</description>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <description>OpenTelemetry Integration</description>
<dependencies>
+ <!-- OpenTelemetry dependencies -->
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-metadata</artifactId>
- <version>${project.version}</version>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
-
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-prometheus</artifactId>
</dependency>
-
<dependency>
- <groupId>io.prometheus</groupId>
- <artifactId>simpleclient_jetty</artifactId>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
</dependency>
-
<dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>javax.servlet-api</artifactId>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
-
<dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>javax.ws.rs-api</artifactId>
+ <groupId>io.opentelemetry.instrumentation</groupId>
+ <artifactId>opentelemetry-resources</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry.semconv</groupId>
+ <artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt-impl</artifactId>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>io.jsonwebtoken</groupId>
- <artifactId>jjwt-jackson</artifactId>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
</dependency>
<!-- test -->
<dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bc-fips</artifactId>
- <version>${bouncycastle.bc-fips.version}</version>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
@@ -82,6 +89,12 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -117,29 +130,6 @@
</execution>
</executions>
</plugin>
-
- <plugin>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>test-compile</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
-
<outputDirectory>${project.build.testOutputDirectory}/certificate-authority</outputDirectory>
- <overwrite>true</overwrite>
- <resources>
- <resource>
-
<directory>${project.parent.basedir}/tests/certificate-authority</directory>
- <filtering>false</filtering>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
</project>
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
new file mode 100644
index 00000000000..bdb002cb359
--- /dev/null
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import io.opentelemetry.api.common.AttributeKey;
+
+/**
+ * Common OpenTelemetry attributes to be used by Pulsar components.
+ */
+public interface OpenTelemetryAttributes {
+ /**
+ * The name of the Pulsar cluster. This attribute is automatically added
to all signals by
+ * {@link OpenTelemetryService}.
+ */
+ AttributeKey<String> PULSAR_CLUSTER =
AttributeKey.stringKey("pulsar.cluster");
+}
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
new file mode 100644
index 00000000000..5ead1ff265c
--- /dev/null
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
+import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import lombok.Builder;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Provides a common OpenTelemetry service for Pulsar components to use.
Responsible for instantiating the OpenTelemetry
+ * SDK with a set of override properties. Once initialized, furnishes access
to OpenTelemetry.
+ */
+public class OpenTelemetryService implements Closeable {
+
+ static final String OTEL_SDK_DISABLED_KEY = "otel.sdk.disabled";
+ static final int MAX_CARDINALITY_LIMIT = 10000;
+
+ private final OpenTelemetrySdk openTelemetrySdk;
+
+ /**
+ * Instantiates the OpenTelemetry SDK. All attributes are overridden by
system properties or environment
+ * variables.
+ *
+ * @param clusterName
+ * The name of the Pulsar cluster. Cannot be null or blank.
+ * @param serviceName
+ * The name of the service. Optional.
+ * @param serviceVersion
+ * The version of the service. Optional.
+ * @param sdkBuilderConsumer
+ * Allows customizing the SDK builder; for testing purposes only.
+ */
+ @Builder
+ public OpenTelemetryService(String clusterName,
+ String serviceName,
+ String serviceVersion,
+ @VisibleForTesting
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> sdkBuilderConsumer) {
+ checkArgument(StringUtils.isNotBlank(clusterName), "Cluster name
cannot be empty");
+ var sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder();
+
+ sdkBuilder.addPropertiesSupplier(() -> Map.of(
+ OTEL_SDK_DISABLED_KEY, "true",
+ // Cardinality limit includes the overflow attribute set, so
we need to add 1.
+ "otel.experimental.metrics.cardinality.limit",
Integer.toString(MAX_CARDINALITY_LIMIT + 1)
+ ));
+
+ sdkBuilder.addResourceCustomizer(
+ (resource, __) -> {
+ var resourceBuilder = Resource.builder();
+ // Do not override attributes if already set (via system
properties or environment variables).
+ if
(resource.getAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER) == null) {
+
resourceBuilder.put(OpenTelemetryAttributes.PULSAR_CLUSTER, clusterName);
+ }
+ if (StringUtils.isNotBlank(serviceName)
+ &&
Objects.equals(Resource.getDefault().getAttribute(ResourceAttributes.SERVICE_NAME),
+
resource.getAttribute(ResourceAttributes.SERVICE_NAME))) {
+ resourceBuilder.put(ResourceAttributes.SERVICE_NAME,
serviceName);
+ }
+ if (StringUtils.isNotBlank(serviceVersion)
+ &&
resource.getAttribute(ResourceAttributes.SERVICE_VERSION) == null) {
+
resourceBuilder.put(ResourceAttributes.SERVICE_VERSION, serviceVersion);
+ }
+ return resource.merge(resourceBuilder.build());
+ });
+
+ if (sdkBuilderConsumer != null) {
+ sdkBuilderConsumer.accept(sdkBuilder);
+ }
+
+ openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk();
+ }
+
+ public OpenTelemetry getOpenTelemetry() {
+ return openTelemetrySdk;
+ }
+
+ @Override
+ public void close() {
+ openTelemetrySdk.close();
+ }
+}
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
new file mode 100644
index 00000000000..9a7426aa047
--- /dev/null
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides a wrapper layer for the OpenTelemetry API to be used in Pulsar.
+ * @since 3.3.0
+ */
+package org.apache.pulsar.opentelemetry;
\ No newline at end of file
diff --git
a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
new file mode 100644
index 00000000000..e5c893794a0
--- /dev/null
+++
b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.instrumentation.resources.JarServiceNameDetector;
+import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.metrics.export.MetricReader;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import io.opentelemetry.semconv.ResourceAttributes;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.Cleanup;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.assertj.core.api.AbstractCharSequenceAssert;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryServiceTest {
+
+ private OpenTelemetryService openTelemetryService;
+ private InMemoryMetricReader reader;
+ private Meter meter;
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ reader = InMemoryMetricReader.create();
+ openTelemetryService = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY,
"false"))).
+ clusterName("openTelemetryServiceTestCluster").
+ build();
+ meter =
openTelemetryService.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+ }
+
+ @AfterMethod
+ public void teardown() throws Exception {
+ openTelemetryService.close();
+ reader.close();
+ }
+
+ // Customizes the SDK builder to include the MetricReader and extra
properties for testing purposes.
+ private static Consumer<AutoConfiguredOpenTelemetrySdkBuilder>
getSdkBuilderConsumer(MetricReader extraReader,
+
Map<String, String> extraProperties) {
+ return autoConfigurationCustomizer -> {
+ if (extraReader != null) {
+ autoConfigurationCustomizer.addMeterProviderCustomizer(
+ (sdkMeterProviderBuilder, __) ->
sdkMeterProviderBuilder.registerMetricReader(extraReader));
+ }
+ autoConfigurationCustomizer.addPropertiesSupplier(() ->
extraProperties);
+ };
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testClusterNameCannotBeNull() {
+ @Cleanup
+ var ots = OpenTelemetryService.builder().build();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testClusterNameCannotBeEmpty() {
+ @Cleanup
+ var ots =
OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build();
+ }
+
+ @Test
+ public void testResourceAttributesAreSet() throws Exception {
+ @Cleanup
+ var reader = InMemoryMetricReader.create();
+
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(reader,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY,
"false",
+ "otel.java.disabled.resource.providers",
JarServiceNameDetector.class.getName()))).
+ clusterName("testServiceNameAndVersion").
+ serviceName("openTelemetryServiceTestService").
+ serviceVersion("1.0.0").
+ build();
+
+ assertThat(reader.collectAllMetrics())
+ .allSatisfy(metric -> assertThat(metric)
+ .hasResourceSatisfying(resource -> resource
+ .hasAttribute(OpenTelemetryAttributes.PULSAR_CLUSTER,
"testServiceNameAndVersion")
+ .hasAttribute(ResourceAttributes.SERVICE_NAME,
"openTelemetryServiceTestService")
+ .hasAttribute(ResourceAttributes.SERVICE_VERSION, "1.0.0")
+ .hasAttribute(satisfies(ResourceAttributes.HOST_NAME,
AbstractCharSequenceAssert::isNotBlank))));
+ }
+
+ @Test
+ public void testIsInstrumentationNameSetOnMeter() {
+ var meter =
openTelemetryService.getOpenTelemetry().getMeter("testInstrumentationScope");
+ meter.counterBuilder("dummyCounter").build().add(1);
+ assertThat(reader.collectAllMetrics())
+ .anySatisfy(metricData -> assertThat(metricData)
+
.hasInstrumentationScope(InstrumentationScopeInfo.create("testInstrumentationScope")));
+ }
+
+ @Test
+ public void testMetricCardinalityIsSet() {
+ var prometheusExporterPort = 9464;
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(null,
+ Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY,
"false",
+ "otel.metrics.exporter", "prometheus",
+ "otel.exporter.prometheus.port",
Integer.toString(prometheusExporterPort)))).
+ clusterName("openTelemetryServiceCardinalityTestCluster").
+ build();
+ var meter =
ots.getOpenTelemetry().getMeter("openTelemetryMetricCardinalityTest");
+ var counter = meter.counterBuilder("dummyCounter").build();
+ for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 100;
i++) {
+ counter.add(1, Attributes.of(AttributeKey.stringKey("attribute"),
"value" + i));
+ }
+
+ Awaitility.waitAtMost(30,
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+ var client = new PrometheusMetricsClient("localhost",
prometheusExporterPort);
+ var allMetrics = client.getMetrics();
+ var actualMetrics =
allMetrics.findByNameAndLabels("dummyCounter_total");
+ var overflowMetric =
allMetrics.findByNameAndLabels("dummyCounter_total", "otel_metric_overflow",
"true");
+ return actualMetrics.size() ==
OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1 && overflowMetric.size() == 1;
+ });
+ }
+
+ @Test
+ public void testLongCounter() {
+ var longCounter = meter.counterBuilder("dummyLongCounter").build();
+ var attributes = Attributes.of(AttributeKey.stringKey("dummyAttr"),
"dummyValue");
+ longCounter.add(1, attributes);
+ longCounter.add(2, attributes);
+
+ assertThat(reader.collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+ .hasName("dummyLongCounter")
+ .hasLongSumSatisfying(sum -> sum
+ .hasPointsSatisfying(point -> point
+ .hasAttributes(attributes)
+ .hasValue(3))));
+ }
+
+ @Test
+ public void testServiceIsDisabledByDefault() throws Exception {
+ @Cleanup
+ var metricReader = InMemoryMetricReader.create();
+
+ @Cleanup
+ var ots = OpenTelemetryService.builder().
+ sdkBuilderConsumer(getSdkBuilderConsumer(metricReader,
Map.of())).
+ clusterName("openTelemetryServiceTestCluster").
+ build();
+ var meter =
ots.getOpenTelemetry().getMeter("openTelemetryServiceTestInstrument");
+
+ var builders = List.of(
+ meter.counterBuilder("dummyCounterA"),
+ meter.counterBuilder("dummyCounterB").setDescription("desc"),
+
meter.counterBuilder("dummyCounterC").setDescription("desc").setUnit("unit"),
+ meter.counterBuilder("dummyCounterD").setUnit("unit")
+ );
+
+ var callback = new AtomicBoolean();
+ // Validate that no matter how the counters are being built, they are
all backed by the same underlying object.
+ // This ensures we conserve memory when the SDK is disabled.
+
assertThat(builders.stream().map(LongCounterBuilder::build).distinct()).hasSize(1);
+
assertThat(builders.stream().map(LongCounterBuilder::buildObserver).distinct()).hasSize(1);
+ assertThat(builders.stream().map(b -> b.buildWithCallback(__ ->
callback.set(true))).distinct()).hasSize(1);
+
+ // Validate that no metrics are being emitted at all.
+ assertThat(metricReader.collectAllMetrics()).isEmpty();
+
+ // Validate that the callback has not being called.
+ assertThat(callback).isFalse();
+ }
+}
diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml
index 8fb1313f9ce..55dfd11e40e 100644
--- a/pulsar-proxy/pom.xml
+++ b/pulsar-proxy/pom.xml
@@ -49,6 +49,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-docs-tools</artifactId>
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 719c7c2cbda..61b00871cec 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.proxy.extensions.ProxyExtensions;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
import org.apache.pulsar.proxy.stats.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,6 +149,8 @@ public class ProxyService implements Closeable {
private PrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
+ @Getter
+ private PulsarProxyOpenTelemetry openTelemetry;
@Getter
private final ConnectionController connectionController;
@@ -284,6 +287,7 @@ public class ProxyService implements Closeable {
}
createMetricsServlet();
+ openTelemetry = new PulsarProxyOpenTelemetry(proxyConfig);
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
@@ -399,6 +403,9 @@ public class ProxyService implements Closeable {
proxyAdditionalServlets = null;
}
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
resetMetricsServlet();
if (localMetadataStore != null) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
new file mode 100644
index 00000000000..14bbc649466
--- /dev/null
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.stats;
+
+import io.opentelemetry.api.metrics.Meter;
+import java.io.Closeable;
+import lombok.Getter;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.opentelemetry.OpenTelemetryService;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+
+public class PulsarProxyOpenTelemetry implements Closeable {
+
+ public static final String SERVICE_NAME = "pulsar-proxy";
+ private final OpenTelemetryService openTelemetryService;
+
+ @Getter
+ private final Meter meter;
+
+ public PulsarProxyOpenTelemetry(ProxyConfiguration config) {
+ openTelemetryService = OpenTelemetryService.builder()
+ .clusterName(config.getClusterName())
+ .serviceName(SERVICE_NAME)
+ .serviceVersion(PulsarVersion.getVersion())
+ .build();
+ meter =
openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
+ }
+
+ @Override
+ public void close() {
+ openTelemetryService.close();
+ }
+}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
index fde7c938d0a..f9ace716ecd 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java
@@ -140,6 +140,7 @@ public abstract class SimpleProxyExtensionTestBase extends
MockedPulsarServiceBa
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
index bc2029861f4..92c644b470d 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AdminProxyHandlerKeystoreTLSTest.java
@@ -101,6 +101,7 @@ public class AdminProxyHandlerKeystoreTLSTest extends
MockedPulsarServiceBaseTes
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationKeyStoreTls.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(String.format("keyStoreType:%s,keyStorePath:%s,keyStorePassword:%s",
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
+ proxyConfig.setClusterName(configClusterName);
resource = new PulsarResources(registerCloseable(new
ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
index d83de9652cf..ef58648e35a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/AuthedAdminProxyHandlerTest.java
@@ -85,6 +85,7 @@ public class AuthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
proxyConfig.setHttpMaxRequestHeaderSize(20000);
+ proxyConfig.setClusterName(configClusterName);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
index 9f8efa1ec79..f61a73bbf91 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAdditionalServletTest.java
@@ -78,6 +78,7 @@ public class ProxyAdditionalServletTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
+ proxyConfig.setClusterName(configClusterName);
// this is for nar package test
// addServletNar();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 1c93cb20c70..4083c984d98 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -137,6 +137,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends
ProducerConsumerBase
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index fec0673ff9b..662b8305c0e 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -58,6 +58,7 @@ import org.testng.annotations.Test;
public class ProxyAuthenticationTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+ private static final String CLUSTER_NAME = "test";
public static class BasicAuthenticationData implements
AuthenticationDataProvider {
private final String authParam;
@@ -178,7 +179,7 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set<String> proxyRoles = new HashSet<>();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -222,6 +223,7 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index a070d1e84d3..78ab9bd0d95 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -59,6 +59,7 @@ public class ProxyConnectionThrottlingTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyConfig.setMaxConcurrentInboundConnectionsPerIp(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
index 5704ba55fed..413774daf2c 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyEnableHAProxyProtocolTest.java
@@ -60,6 +60,7 @@ public class ProxyEnableHAProxyProtocolTest extends
MockedPulsarServiceBaseTest
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setHaProxyProtocolEnabled(true);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
index 477fe597f26..5e969ca26e4 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
public class ProxyForwardAuthDataTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyForwardAuthDataTest.class);
+ private static final String CLUSTER_NAME = "test";
@BeforeMethod
@Override
@@ -64,7 +65,7 @@ public class ProxyForwardAuthDataTest extends
ProducerConsumerBase {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set<String> proxyRoles = new HashSet<String>();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -109,6 +110,7 @@ public class ProxyForwardAuthDataTest extends
ProducerConsumerBase {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+ proxyConfig.setClusterName(CLUSTER_NAME);
Set<String> providers = new HashSet<>();
providers.add(BasicAuthenticationProvider.class.getName());
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
index 5ee03395b80..5671c527f68 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsTransportTest.java
@@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsTransportTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyConfig.setTlsRequireTrustedClientCertOnConnect(false);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
index 1f21281a6f6..99fb8c03a81 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithAuthTest.java
@@ -77,6 +77,7 @@ public class ProxyKeyStoreTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
// config for authentication and authorization.
proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
index d7935755ce0..1dcebda7935 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyKeyStoreTlsWithoutAuthTest.java
@@ -74,6 +74,7 @@ public class ProxyKeyStoreTlsWithoutAuthTest extends
MockedPulsarServiceBaseTest
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 167c3b19646..a9017404d0e 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -65,6 +65,7 @@ public class ProxyLookupThrottlingTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
+ proxyConfig.setClusterName(configClusterName);
AuthenticationService authenticationService = new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
index 08066f2e5bf..fae44c00ada 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyMutualTlsTest.java
@@ -66,6 +66,7 @@ public class ProxyMutualTlsTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setTlsRequireTrustedClientCertOnConnect(true);
proxyConfig.setTlsAllowInsecureConnection(false);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 0d93185f5e8..3f58250e6d6 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -71,6 +71,7 @@ public class ProxyParserTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
//enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.ofNullable(2));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index 6beed27cb66..d06cf4201ff 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -52,6 +52,7 @@ import org.testng.annotations.Test;
@Slf4j
public class ProxyRefreshAuthTest extends ProducerConsumerBase {
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private ProxyService proxyService;
@@ -84,7 +85,7 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
properties.setProperty("tokenAllowedClockSkewSeconds", "2");
conf.setProperties(properties);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
conf.setAuthenticationRefreshCheckSeconds(1);
@@ -116,6 +117,7 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
index 137ea829515..a1ffc13ee93 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRolesEnforcementTest.java
@@ -49,6 +49,7 @@ import org.testng.annotations.Test;
public class ProxyRolesEnforcementTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyRolesEnforcementTest.class);
+ private static final String CLUSTER_NAME = "test";
public static class BasicAuthenticationData implements
AuthenticationDataProvider {
private final String authParam;
@@ -154,7 +155,7 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
providers.add(BasicAuthenticationProvider.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("test");
+ conf.setClusterName(CLUSTER_NAME);
Set<String> proxyRoles = new HashSet<>();
proxyRoles.add("proxy");
conf.setProxyRoles(proxyRoles);
@@ -209,6 +210,7 @@ public class ProxyRolesEnforcementTest extends
ProducerConsumerBase {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
index 0c9fa5c7ac3..3e598a57277 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterDisableZeroCopyTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.proxy.server;
import java.util.Optional;
import org.testng.annotations.BeforeClass;
-public class ProxyServiceStarterDisableZeroCopyTest extends
ProxyServiceStarterTest{
+public class ProxyServiceStarterDisableZeroCopyTest extends
ProxyServiceStarterTest {
@Override
@BeforeClass
@@ -35,6 +35,7 @@ public class ProxyServiceStarterDisableZeroCopyTest extends
ProxyServiceStarterT
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
serviceStarter.getConfig().setProxyZeroCopyModeEnabled(false);
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrl();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
index 71b1087ee64..f2632861253 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -61,6 +61,7 @@ public class ProxyServiceStarterTest extends
MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setServicePort(Optional.of(0));
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrl();
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
index b21162577a2..61718bbac3a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceTlsStarterTest.java
@@ -68,6 +68,7 @@ public class ProxyServiceTlsStarterTest extends
MockedPulsarServiceBaseTest {
serviceStarter.getConfig().setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
serviceStarter.getConfig().setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
serviceStarter.getConfig().setBrokerProxyAllowedTargetPorts("*");
+ serviceStarter.getConfig().setClusterName(configClusterName);
serviceStarter.start();
serviceUrl = serviceStarter.getProxyService().getServiceUrlTls();
webPort = serviceStarter.getServer().getListenPortHTTP().get();
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
index 155fbf616b0..2866c6c2690 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
@@ -72,6 +72,7 @@ public class ProxyStatsTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
// enable full parsing feature
proxyConfig.setProxyLogLevel(Optional.of(2));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
index 79ea7c5d6a3..6e66008c15a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStuckConnectionTest.java
@@ -79,6 +79,7 @@ public class ProxyStuckConnectionTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setServicePort(Optional.ofNullable(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(configClusterName);
startProxyService();
// use the same port for subsequent restarts
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ac08052aaf1..9bc12dcc6fc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -106,6 +106,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
}
@Override
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index a1b27abece4..4e300d39741 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -61,6 +61,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest
{
proxyConfig.setTlsKeyFilePath(PROXY_KEY_FILE_PATH);
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
index ec5cace8a06..16f610d6d0a 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsWithAuthTest.java
@@ -73,6 +73,7 @@ public class ProxyTlsWithAuthTest extends
MockedPulsarServiceBaseTest {
" \"issuerUrl\":\"" + server.getIssuer() + "\"," +
" \"audience\": \"an-audience\"," +
" \"privateKey\":\"file://" + tempFile.getAbsolutePath() + "\"}");
+ proxyConfig.setClusterName(configClusterName);
proxyService = Mockito.spy(new ProxyService(proxyConfig, new
AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
index e0dcefe2714..cf9ad5831ec 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationNegTest.java
@@ -57,6 +57,7 @@ import org.testng.collections.Maps;
public class ProxyWithAuthorizationNegTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyWithAuthorizationNegTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization-neg";
private final String TLS_PROXY_TRUST_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cacert.pem";
private final String TLS_PROXY_CERT_FILE_PATH =
"./src/test/resources/authentication/tls/ProxyWithAuthorizationTest/proxy-cert.pem";
@@ -104,7 +105,7 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("proxy-authorization-neg");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -121,6 +122,7 @@ public class ProxyWithAuthorizationNegTest extends
ProducerConsumerBase {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
index 4e4c3c550cf..bc96c7ea510 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithAuthorizationTest.java
@@ -64,6 +64,7 @@ import org.testng.collections.Maps;
public class ProxyWithAuthorizationTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyWithAuthorizationTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
private final String CLIENT_TOKEN = AuthTokenUtils.createToken(SECRET_KEY,
"Client", Optional.empty());
@@ -189,7 +190,7 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
properties.setProperty("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(SECRET_KEY));
conf.setProperties(properties);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
}
@@ -206,6 +207,7 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setBrokerWebServiceURLTLS(pulsar.getWebServiceAddressTls());
proxyConfig.setAdvertisedAddress(null);
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setServicePortTls(Optional.of(0));
@@ -432,6 +434,7 @@ public class ProxyWithAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerServiceURLTLS(pulsar.getBrokerServiceUrlTls());
proxyConfig.setAdvertisedAddress(null);
+ proxyConfig.setClusterName(CLUSTER_NAME);
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
index f997532b273..d3c05fec721 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithExtensibleLoadManagerTest.java
@@ -110,6 +110,7 @@ public class ProxyWithExtensibleLoadManagerTest extends
MultiBrokerBaseTest {
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
return proxyConfig;
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
index 14be7dadc41..5fb3e046824 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java
@@ -67,6 +67,7 @@ import org.testng.annotations.Test;
public class ProxyWithJwtAuthorizationTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyWithJwtAuthorizationTest.class);
+ private static final String CLUSTER_NAME = "proxy-authorization";
private final String ADMIN_ROLE = "admin";
private final String PROXY_ROLE = "proxy";
@@ -104,7 +105,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("proxy-authorization");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -119,6 +120,7 @@ public class ProxyWithJwtAuthorizationTest extends
ProducerConsumerBase {
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable auth&auth and use JWT at proxy
proxyConfig.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
index e09194bb21d..9d9490e74b5 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithoutServiceDiscoveryTest.java
@@ -54,9 +54,11 @@ import org.testng.collections.Maps;
public class ProxyWithoutServiceDiscoveryTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(ProxyWithoutServiceDiscoveryTest.class);
+ private static final String CLUSTER_NAME = "without-service-discovery";
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -89,7 +91,7 @@ public class ProxyWithoutServiceDiscoveryTest extends
ProducerConsumerBase {
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("without-service-discovery");
+ conf.setClusterName(CLUSTER_NAME);
conf.setNumExecutorThreadPoolSize(5);
super.init();
@@ -106,6 +108,7 @@ public class ProxyWithoutServiceDiscoveryTest extends
ProducerConsumerBase {
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(CLUSTER_NAME);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(PROXY_CERT_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
index a44e2a85efa..57522186c8f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/SuperUserAuthedAdminProxyHandlerTest.java
@@ -80,6 +80,7 @@ public class SuperUserAuthedAdminProxyHandlerTest extends
MockedPulsarServiceBas
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setWebServicePortTls(Optional.of(0));
proxyConfig.setTlsEnabledWithBroker(true);
+ proxyConfig.setClusterName(configClusterName);
// enable tls and auth&auth at proxy
proxyConfig.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
index d239815ae81..fe8b1f45385 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/UnauthedAdminProxyHandlerTest.java
@@ -75,6 +75,7 @@ public class UnauthedAdminProxyHandlerTest extends
MockedPulsarServiceBaseTest {
proxyConfig.setStatusFilePath(STATUS_FILE_PATH);
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ proxyConfig.setClusterName(configClusterName);
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index df36c35a191..5582931851b 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -55,6 +55,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
@@ -73,6 +80,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-proxy</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>managed-ledger</artifactId>
@@ -169,7 +182,6 @@
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
@@ -189,6 +201,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- kinesis-->
<dependency>
<groupId>org.testcontainers</groupId>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
new file mode 100644
index 00000000000..2b115ca6b95
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import java.time.Duration;
+import org.apache.http.HttpStatus;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.MountableFile;
+
+public class OpenTelemetryCollectorContainer extends
ChaosContainer<OpenTelemetryCollectorContainer> {
+
+ private static final String IMAGE_NAME =
"otel/opentelemetry-collector-contrib:latest";
+ private static final String NAME = "otel-collector";
+
+ public static final int PROMETHEUS_EXPORTER_PORT = 8889;
+ private static final int OTLP_RECEIVER_PORT = 4317;
+ private static final int ZPAGES_PORT = 55679;
+
+ public OpenTelemetryCollectorContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+
+ this.withCopyFileToContainer(
+
MountableFile.forClasspathResource("containers/otel-collector-config.yaml",
0644),
+ "/etc/otel-collector-config.yaml")
+ .withCommand("--config=/etc/otel-collector-config.yaml")
+ .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT,
ZPAGES_PORT)
+ .waitingFor(new HttpWaitStrategy()
+ .forPath("/debug/servicez")
+ .forPort(ZPAGES_PORT)
+ .forStatusCode(HttpStatus.SC_OK)
+ .withStartupTimeout(Duration.ofSeconds(300)));
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName + "-" + NAME;
+ }
+
+ public String getOtlpEndpoint() {
+ return String.format("http://%s:%d", NAME, OTLP_RECEIVER_PORT);
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 56d64ce5b2c..77cdc1bfd28 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -70,6 +71,7 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
public static final boolean PULSAR_CONTAINERS_LEAVE_RUNNING =
Boolean.parseBoolean(System.getenv("PULSAR_CONTAINERS_LEAVE_RUNNING"));
+ @Getter
protected final String hostname;
private final String serviceName;
private final String serviceEntryPoint;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
new file mode 100644
index 00000000000..38afc1f127d
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.metrics;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient;
+import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
+import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import
org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+public class OpenTelemetrySanityTest {
+
+ // Validate that the OpenTelemetry metrics can be exported to a remote
OpenTelemetry collector.
+ @Test(timeOut = 360_000)
+ public void testOpenTelemetryMetricsOtlpExport() throws Exception {
+ var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+ var openTelemetryCollectorContainer = new
OpenTelemetryCollectorContainer(clusterName);
+
+ var exporter = "otlp";
+ var otlpEndpointProp =
+ Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT",
openTelemetryCollectorContainer.getOtlpEndpoint());
+
+ var brokerCollectorProps = getOpenTelemetryProps(exporter,
otlpEndpointProp);
+ var proxyCollectorProps = getOpenTelemetryProps(exporter,
otlpEndpointProp);
+ var functionWorkerCollectorProps = getOpenTelemetryProps(exporter,
otlpEndpointProp);
+
+ var spec = PulsarClusterSpec.builder()
+ .clusterName(clusterName)
+ .brokerEnvs(brokerCollectorProps)
+ .proxyEnvs(proxyCollectorProps)
+ .externalService("otel-collector",
openTelemetryCollectorContainer)
+ .functionWorkerEnvs(functionWorkerCollectorProps)
+ .build();
+ @Cleanup("stop")
+ var pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(),
FunctionRuntimeType.PROCESS, 1);
+
+ // TODO: Validate cluster name and service version are present once
+ // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is
solved.
+ var metricName = "queueSize_ratio"; // Sent automatically by the
OpenTelemetry SDK.
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job",
PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job",
PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics = getMetricsFromPrometheus(
+ openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
+ return !metrics.findByNameAndLabels(metricName, "job",
PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
+ });
+ }
+
+ /*
+ * Validate that the OpenTelemetry metrics can be exported to a local
Prometheus endpoint running in the same
+ * process space as the broker/proxy/function-worker.
+ *
https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter
+ */
+ @Test(timeOut = 360_000)
+ public void testOpenTelemetryMetricsPrometheusExport() throws Exception {
+ var prometheusExporterPort = 9464;
+ var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID();
+
+ var exporter = "prometheus";
+ var prometheusExporterPortProp =
+ Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT",
Integer.toString(prometheusExporterPort));
+
+ var brokerCollectorProps = getOpenTelemetryProps(exporter,
prometheusExporterPortProp);
+ var proxyCollectorProps = getOpenTelemetryProps(exporter,
prometheusExporterPortProp);
+ var functionWorkerCollectorProps = getOpenTelemetryProps(exporter,
prometheusExporterPortProp);
+
+ var spec = PulsarClusterSpec.builder()
+ .clusterName(clusterName)
+ .brokerEnvs(brokerCollectorProps)
+ .brokerAdditionalPorts(List.of(prometheusExporterPort))
+ .proxyEnvs(proxyCollectorProps)
+ .proxyAdditionalPorts(List.of(prometheusExporterPort))
+ .functionWorkerEnvs(functionWorkerCollectorProps)
+ .functionWorkerAdditionalPorts(List.of(prometheusExporterPort))
+ .build();
+ @Cleanup("stop")
+ var pulsarCluster = PulsarCluster.forSpec(spec);
+ pulsarCluster.start();
+ pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(),
FunctionRuntimeType.PROCESS, 1);
+
+ var metricName = "target_info"; // Sent automatically by the
OpenTelemetry SDK.
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0),
prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name",
PulsarBrokerOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name",
pulsarCluster.getBroker(0).getHostname())).isEmpty();
+ });
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(),
prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name",
PulsarProxyOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name",
pulsarCluster.getProxy().getHostname())).isEmpty();
+ });
+ Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ var metrics =
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
+ return !metrics.findByNameAndLabels(metricName,
+ Pair.of("pulsar_cluster", clusterName),
+ Pair.of("service_name",
PulsarWorkerOpenTelemetry.SERVICE_NAME),
+ Pair.of("service_version", PulsarVersion.getVersion()),
+ Pair.of("host_name",
pulsarCluster.getAnyWorker().getHostname())).isEmpty();
+ });
+ }
+
+ private static PrometheusMetricsClient.Metrics
getMetricsFromPrometheus(ChaosContainer<?> container, int port) {
+ var client = new PrometheusMetricsClient(container.getHost(),
container.getMappedPort(port));
+ return client.getMetrics();
+ }
+
+ private static Map<String, String> getOpenTelemetryProps(String exporter,
Pair<String, String> ... extraProps) {
+ var defaultProps = Map.of(
+ "OTEL_SDK_DISABLED", "false",
+ "OTEL_METRIC_EXPORT_INTERVAL", "1000",
+ "OTEL_METRICS_EXPORTER", exporter
+ );
+ var props = new HashMap<>(defaultProps);
+ Arrays.stream(extraProps).forEach(p -> props.put(p.getKey(),
p.getValue()));
+ return props;
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index bc9b1e267b9..5f893f67f74 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -102,6 +102,8 @@ public class PulsarCluster {
private final ProxyContainer proxyContainer;
private Map<String, GenericContainer<?>> externalServices =
Collections.emptyMap();
private Map<String, Map<String, String>> externalServiceEnvs;
+ private final Map<String, String> functionWorkerEnvs;
+ private final List<Integer> functionWorkerAdditionalPorts;
private final String metadataStoreUrl;
private final String configurationMetadataStoreUrl;
@@ -182,6 +184,9 @@ public class PulsarCluster {
if (spec.proxyMountFiles != null) {
spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
}
+ if (spec.proxyAdditionalPorts != null) {
+
spec.proxyAdditionalPorts.forEach(this.proxyContainer::addExposedPort);
+ }
// create bookies
bookieContainers.putAll(
@@ -268,6 +273,8 @@ public class PulsarCluster {
workerContainers.values().forEach(c ->
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
});
+ functionWorkerEnvs = spec.functionWorkerEnvs;
+ functionWorkerAdditionalPorts = spec.functionWorkerAdditionalPorts;
}
public String getPlainTextServiceUrl() {
@@ -475,23 +482,25 @@ public class PulsarCluster {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
workerContainers.putAll(runNumContainers(
- "functions-worker-process-" + suffix,
- numFunctionWorkers,
- (name) -> new WorkerContainer(clusterName, name)
- .withNetwork(network)
- .withNetworkAliases(name)
- // worker settings
- .withEnv("PF_workerId", name)
- .withEnv("PF_workerHostname", name)
- .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
- .withEnv("PF_pulsarFunctionsCluster", clusterName)
- .withEnv("PF_pulsarServiceUrl", serviceUrl)
- .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
- // script
- .withEnv("clusterName", clusterName)
- .withEnv("zookeeperServers", ZKContainer.NAME)
- // bookkeeper tools
- .withEnv("zkServers", ZKContainer.NAME)
+ "functions-worker-process-" + suffix,
+ numFunctionWorkers,
+ (name) -> new WorkerContainer(clusterName, name)
+ .withNetwork(network)
+ .withNetworkAliases(name)
+ // worker settings
+ .withEnv("PF_workerId", name)
+ .withEnv("PF_workerHostname", name)
+ .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_pulsarFunctionsCluster", clusterName)
+ .withEnv("PF_pulsarServiceUrl", serviceUrl)
+ .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+ // script
+ .withEnv("clusterName", clusterName)
+ .withEnv("zookeeperServers", ZKContainer.NAME)
+ // bookkeeper tools
+ .withEnv("zkServers", ZKContainer.NAME)
+ .withEnv(functionWorkerEnvs)
+ .withExposedPorts(functionWorkerAdditionalPorts.toArray(new
Integer[0]))
));
this.startWorkers();
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index b705b347cff..8a991be49fa 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -147,6 +147,12 @@ public class PulsarClusterSpec {
*/
Map<String, String> bookkeeperEnvs;
+ /**
+ * Specify envs for function workers.
+ */
+ @Singular
+ Map<String, String> functionWorkerEnvs;
+
/**
* Specify mount files.
*/
@@ -170,6 +176,17 @@ public class PulsarClusterSpec {
*/
List<Integer> bookieAdditionalPorts;
+ /**
+ * Additional ports to expose on proxy containers.
+ */
+ List<Integer> proxyAdditionalPorts;
+
+ /**
+ * Additional ports to expose on function workers.
+ */
+ @Singular
+ List<Integer> functionWorkerAdditionalPorts;
+
/**
* Enable TLS for connection.
*/
diff --git
a/tests/integration/src/test/resources/containers/otel-collector-config.yaml
b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
new file mode 100644
index 00000000000..bd332f04283
--- /dev/null
+++ b/tests/integration/src/test/resources/containers/otel-collector-config.yaml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+receivers:
+ otlp:
+ protocols:
+ grpc:
+
+exporters:
+ prometheus:
+ endpoint: "0.0.0.0:8889"
+
+processors:
+ batch:
+
+extensions:
+ health_check:
+ zpages:
+ endpoint: :55679
+
+service:
+ extensions: [zpages, health_check]
+ pipelines:
+ metrics:
+ receivers: [otlp]
+ processors: [batch]
+ exporters: [prometheus]
\ No newline at end of file
diff --git a/tests/integration/src/test/resources/pulsar-metrics.xml
b/tests/integration/src/test/resources/pulsar-metrics.xml
new file mode 100644
index 00000000000..1c87f2bdf0d
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-metrics.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
+<suite name="Pulsar Metrics Integration Tests" verbose="2" annotations="JDK">
+ <test name="metrics-test-suite" preserve-order="true">
+ <classes>
+ <class
name="org.apache.pulsar.tests.integration.metrics.OpenTelemetrySanityTest"/>
+ </classes>
+ </test>
+</suite>
diff --git a/tests/integration/src/test/resources/pulsar.xml
b/tests/integration/src/test/resources/pulsar.xml
index bdc5f27cc78..aa9a59a6cda 100644
--- a/tests/integration/src/test/resources/pulsar.xml
+++ b/tests/integration/src/test/resources/pulsar.xml
@@ -37,5 +37,6 @@
<suite-file path="./pulsar-python.xml" />
<suite-file path="./pulsar-semantics.xml" />
<suite-file path="./pulsar-upgrade.xml" />
+ <suite-file path="./pulsar-metrics.xml" />
</suite-files>
</suite>