asafm commented on code in PR #22010: URL: https://github.com/apache/pulsar/pull/22010#discussion_r1474451401
########## pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PulsarWorkerOpenTelemetry.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.common.stats.OpenTelemetryService; +import org.apache.pulsar.functions.instance.InstanceConfig; + +public class PulsarWorkerOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarWorkerOpenTelemetry(InstanceConfig instanceConfig) { + openTelemetryService = OpenTelemetryService.builder().clusterName(instanceConfig.getClusterName()).build(); + meter = openTelemetryService.getMeter("pulsar.worker"); Review Comment: The meter needs to be different as detailed in previous comment ########## pulsar-otel-metrics-provider/pom.xml: ########## @@ -0,0 +1,113 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>3.3.0-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-otel-metrics-provider</artifactId> + <description>OpenTelemetry integration provider</description> Review Comment: I don't think it's a provider, like Authentication Provider, since we do not intend this to be customized or an interface. Maybe "OpenTelemetry integration" Also, note that `pulsar-otel-integration` might be more appropriate since OTel, for now, is just metrics, but it will be tracing, logging, and much more in the future. ########## pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java: ########## @@ -223,6 +225,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, .quantile(0.99, 0.01) .quantile(0.999, 0.01) .create()); + this.openTelemetry = new PulsarWorkerOpenTelemetry(config); Review Comment: The PIP out of scope said: > * Ability to add metrics using OpenTelemetry as Pulsar Function author. Aside from keeping the PR size reasonable, it requires more thinking and making sure we're have everything thought of. When writing the PIP-264 I remember that I needed more time to think it through so I prefer to defer it as a sub-PIP of its own. Specifically here, if I'm not mistaken `ContextImpl` is used the processing executing the function, not the function worker it self. The idea was that the function would have its own Meter, named after the function name. I don't have the details since I was counting on doing it in its own PIP. I have no idea yet how the configuration of the function runtime would happen, especially for k8s. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.stats.OpenTelemetryService; + +public class PulsarBrokerOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarBrokerOpenTelemetry(PulsarService pulsar) { Review Comment: I think we can limit the surface area to just the `clusterName` or `ServiceConfiguration`. This makes it easy to test without mocking too much. Also makes it easier to understand what this class does. ########## pulsar-otel-metrics-provider/pom.xml: ########## @@ -0,0 +1,113 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>3.3.0-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-otel-metrics-provider</artifactId> + <description>OpenTelemetry integration provider</description> + + <dependencies> + <!-- OpenTelemetry dependencies --> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-otlp</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-prometheus</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> Review Comment: Why is this needed in this module? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.common.stats.OpenTelemetryService; + +public class PulsarBrokerOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarBrokerOpenTelemetry(PulsarService pulsar) { + openTelemetryService = OpenTelemetryService.builder().clusterName(pulsar.getConfig().getClusterName()).build(); + meter = openTelemetryService.getMeter("pulsar.broker"); Review Comment: The meter name should be different - I have researched it and wrote the summary in the PIP: https://github.com/apache/pulsar/blob/master/pip/pip-320.md#meter ########## pulsar-otel-metrics-provider/src/main/java/org/apache/pulsar/common/stats/OpenTelemetryService.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of overrode properties. Once initialized, furnishes access to OpenTelemetry meters. Review Comment: overrode --> override access to OpenTelemetry meters. --> access to OpenTelemetry ########## pulsar-otel-metrics-provider/src/main/java/org/apache/pulsar/common/stats/OpenTelemetryService.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of overrode properties. Once initialized, furnishes access to OpenTelemetry meters. + */ +public class OpenTelemetryService implements Closeable { + + public static final String OTEL_SDK_DISABLED = "otel.sdk.disabled"; + private static final String MAX_CARDINALITY_LIMIT_KEY = "otel.experimental.metrics.cardinality.limit"; + public static final int MAX_CARDINALITY_LIMIT = 10000; + + private final OpenTelemetrySdk openTelemetrySdk; + + @Builder + public OpenTelemetryService(String clusterName, + @Singular Map<String, String> extraProperties, Review Comment: Check later what is this used for ########## pulsar-otel-metrics-provider/src/main/java/org/apache/pulsar/common/stats/OpenTelemetryService.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of overrode properties. Once initialized, furnishes access to OpenTelemetry meters. + */ +public class OpenTelemetryService implements Closeable { + + public static final String OTEL_SDK_DISABLED = "otel.sdk.disabled"; + private static final String MAX_CARDINALITY_LIMIT_KEY = "otel.experimental.metrics.cardinality.limit"; + public static final int MAX_CARDINALITY_LIMIT = 10000; + + private final OpenTelemetrySdk openTelemetrySdk; + + @Builder + public OpenTelemetryService(String clusterName, + @Singular Map<String, String> extraProperties, + // Allows customizing the SDK builder; for testing purposes only. + @VisibleForTesting AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) { + checkArgument(StringUtils.isNotEmpty(clusterName), "Cluster name cannot be empty"); + if (sdkBuilder == null) { + sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder(); + } + + Map<String, String> overrideProperties = new HashMap<>(); + overrideProperties.put(OTEL_SDK_DISABLED, "true"); + // Cardinality limit property is exclusive, so we need to add 1. + overrideProperties.put(MAX_CARDINALITY_LIMIT_KEY, Integer.toString(MAX_CARDINALITY_LIMIT + 1)); + sdkBuilder.addPropertiesSupplier(() -> overrideProperties); + sdkBuilder.addPropertiesSupplier(() -> extraProperties); + + sdkBuilder.addResourceCustomizer( + (resource, __) -> { + AttributeKey<String> clusterNameAttribute = AttributeKey.stringKey("pulsar.cluster"); + if (resource.getAttribute(clusterNameAttribute) != null) { + // Do not override if already set (via system properties or environment variables). + return resource; + } + return resource.merge(Resource.builder().put(clusterNameAttribute, clusterName).build()); + }); + + openTelemetrySdk = sdkBuilder.build().getOpenTelemetrySdk(); + } + + public Meter getMeter(String instrumentationScopeName) { Review Comment: I prefer `OpenTelemetry` (not `OpenTelemetrySdk`, since we want Pulsar to use the API, not the SDK, to define metrics and other stuff like tracing) to be returned. Functions, for example, will create their own Meters and maybe, in the future, their own tracing, so I think it's best to work with the `OpenTelemetry` interface. ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). Review Comment: We need to verify `OpenTelemetryService` starts disabled. ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). Review Comment: If we get the builder, we don't need this. We can use the builder to add that property. ########## pulsar-otel-metrics-provider/src/main/java/org/apache/pulsar/common/stats/OpenTelemetryService.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of overrode properties. Once initialized, furnishes access to OpenTelemetry meters. + */ +public class OpenTelemetryService implements Closeable { + + public static final String OTEL_SDK_DISABLED = "otel.sdk.disabled"; + private static final String MAX_CARDINALITY_LIMIT_KEY = "otel.experimental.metrics.cardinality.limit"; + public static final int MAX_CARDINALITY_LIMIT = 10000; + + private final OpenTelemetrySdk openTelemetrySdk; + + @Builder + public OpenTelemetryService(String clusterName, + @Singular Map<String, String> extraProperties, + // Allows customizing the SDK builder; for testing purposes only. + @VisibleForTesting AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) { Review Comment: I would do the customization as: `@VisibleForTesting Consumer<AutoConfiguredOpenTelemetrySdkBuilder> sdkBuilderCustomizer` We will have a `@VisibleForTesting setOTelSdkBuilderCustomizer(...)` down the road, I guess. ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + meter = openTelemetryService.getMeter("openTelemetryServiceTestInstrument"); + } + + @AfterMethod + public void teardown() throws Exception { + openTelemetryService.close(); + reader.close(); + } + + // Overrides the default sdkBuilder to include the InMemoryMetricReader for testing purposes. + private static AutoConfiguredOpenTelemetrySdkBuilder getSdkBuilder(MetricReader extraReader) { + return AutoConfiguredOpenTelemetrySdk.builder(). + addMeterProviderCustomizer((sdkMeterProviderBuilder, configProperties) -> { + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + sdkMeterProviderBuilder, extraReader, + // Override the max cardinality limit for this extra reader. + instrumentType -> OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1); + return sdkMeterProviderBuilder; + }); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeNull() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeEmpty() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build(); + } + + @Test + public void testIsClusterNameSet() throws Exception { + @Cleanup + InMemoryMetricReader reader = InMemoryMetricReader.create(); + + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("testCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + + Predicate<MetricData> predicate = MetricDataMatcher.builder(). + resourceAttribute(Attributes.of(AttributeKey.stringKey("pulsar.cluster"), "testCluster")). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + Assert.assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testIsInstrumentationNameSetOnMeter() throws Exception { + Meter meter = openTelemetryService.getMeter("testInstrumentationScope"); Review Comment: One of the reasons we want to use `OpenTelemetry` interface and not have our own on top is also since functions and plugins may want to specify the version for the instrumentation scope. ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + meter = openTelemetryService.getMeter("openTelemetryServiceTestInstrument"); + } + + @AfterMethod + public void teardown() throws Exception { + openTelemetryService.close(); + reader.close(); + } + + // Overrides the default sdkBuilder to include the InMemoryMetricReader for testing purposes. + private static AutoConfiguredOpenTelemetrySdkBuilder getSdkBuilder(MetricReader extraReader) { + return AutoConfiguredOpenTelemetrySdk.builder(). + addMeterProviderCustomizer((sdkMeterProviderBuilder, configProperties) -> { + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + sdkMeterProviderBuilder, extraReader, + // Override the max cardinality limit for this extra reader. + instrumentType -> OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1); + return sdkMeterProviderBuilder; + }); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeNull() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeEmpty() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build(); + } + + @Test + public void testIsClusterNameSet() throws Exception { + @Cleanup + InMemoryMetricReader reader = InMemoryMetricReader.create(); + + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("testCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + + Predicate<MetricData> predicate = MetricDataMatcher.builder(). + resourceAttribute(Attributes.of(AttributeKey.stringKey("pulsar.cluster"), "testCluster")). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testIsInstrumentationNameSetOnMeter() throws Exception { + Meter meter = openTelemetryService.getMeter("testInstrumentationScope"); + meter.counterBuilder("dummyCounter").build().add(1); + MetricDataMatcher predicate = MetricDataMatcher.builder(). + name("dummyCounter"). + instrumentationScopeInfo(InstrumentationScopeInfo.create("testInstrumentationScope")). + build(); + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testMetricCardinality() throws Exception { Review Comment: The only thing we're testing here is that we've set the cardinality limit of the reader in our test (`getSdkBuilder`) so I don't think this test is needed. We need to test that the default cardinality limit is not 2k. One way of doing that is via the following test: * Create an OpenTelemetry service, which, by using the builder, adds a Prometheus exporter. * Use the `OpenTelemetry` instance to create a meter, then counter, then record a value to 10,500 `Attributes`. * Create a `PrometheusClient` and use it to read and verify you have 10k Attributes reported. ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + meter = openTelemetryService.getMeter("openTelemetryServiceTestInstrument"); + } + + @AfterMethod + public void teardown() throws Exception { + openTelemetryService.close(); + reader.close(); + } + + // Overrides the default sdkBuilder to include the InMemoryMetricReader for testing purposes. + private static AutoConfiguredOpenTelemetrySdkBuilder getSdkBuilder(MetricReader extraReader) { + return AutoConfiguredOpenTelemetrySdk.builder(). + addMeterProviderCustomizer((sdkMeterProviderBuilder, configProperties) -> { + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + sdkMeterProviderBuilder, extraReader, + // Override the max cardinality limit for this extra reader. + instrumentType -> OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1); + return sdkMeterProviderBuilder; + }); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeNull() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeEmpty() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build(); + } + + @Test + public void testIsClusterNameSet() throws Exception { + @Cleanup + InMemoryMetricReader reader = InMemoryMetricReader.create(); + + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("testCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + + Predicate<MetricData> predicate = MetricDataMatcher.builder(). + resourceAttribute(Attributes.of(AttributeKey.stringKey("pulsar.cluster"), "testCluster")). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testIsInstrumentationNameSetOnMeter() throws Exception { + Meter meter = openTelemetryService.getMeter("testInstrumentationScope"); + meter.counterBuilder("dummyCounter").build().add(1); + MetricDataMatcher predicate = MetricDataMatcher.builder(). + name("dummyCounter"). + instrumentationScopeInfo(InstrumentationScopeInfo.create("testInstrumentationScope")). + build(); + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testMetricCardinality() throws Exception { + LongCounter longCounter = meter.counterBuilder("dummyMetricCardinalityTest").build(); + + for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT; i++) { + longCounter.add(1, Attributes.of(AttributeKey.stringKey("attribute"), "value" + i)); + } + + Predicate<MetricData> hasOverflowAttribute = MetricDataMatcher.builder(). + name("dummyMetricCardinalityTest"). + dataAttribute(MetricStorage.CARDINALITY_OVERFLOW). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().noneMatch(hasOverflowAttribute)); + + for (int i = 0; i < OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1; i++) { + longCounter.add(1, Attributes.of(AttributeKey.stringKey("attribute"), "value" + i)); + } + + metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(hasOverflowAttribute)); + } + + @Test + public void testLongCounter() throws Exception { + LongCounter longCounter = meter.counterBuilder("dummyLongCounter").build(); + longCounter.add(1, Attributes.of(AttributeKey.stringKey("dummyAttr"), "dummyValue")); + longCounter.add(2, Attributes.of(AttributeKey.stringKey("dummyAttr"), "dummyValue")); + + Predicate<MetricData> predicate = MetricDataMatcher.builder(). + name("dummyLongCounter"). + dataAttribute(Attributes.of(AttributeKey.stringKey("dummyAttr"), "dummyValue")). + type(MetricDataType.LONG_SUM). + longValue(3L). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + assertTrue(metricData.stream().anyMatch(predicate)); + } + + @Test + public void testDoubleCounter() throws Exception { Review Comment: Why do we need to test also Double? I understand creating a counter to do a sanity check, but I wouldn't go past that. ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import java.time.Duration; +import org.apache.http.HttpStatus; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +public class OpenTelemetryCollectorContainer extends ChaosContainer<OpenTelemetryCollectorContainer> { + + private static final String IMAGE_NAME = "otel/opentelemetry-collector-contrib:latest"; + private static final String NAME = "otel-collector"; + + public static final int PROMETHEUS_EXPORTER_PORT = 8889; + private static final int OTLP_RECEIVER_PORT = 4317; + private static final int ZPAGES_PORT = 55679; + + public OpenTelemetryCollectorContainer(String clusterName) { + super(clusterName, IMAGE_NAME); + } + + @Override + protected void configure() { + super.configure(); + + this.withCopyFileToContainer( + MountableFile.forClasspathResource("containers/otel-collector-config.yaml", 0644), + "/etc/otel-collector-config.yaml") + .withCommand("--config=/etc/otel-collector-config.yaml") + .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT, ZPAGES_PORT) + .withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd.withHostName(NAME); + createContainerCmd.withName(getContainerName()); Review Comment: What is this for? What is the value of `getContainerName`? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + + // TODO: Validate cluster name is present once + // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. + var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", functionWorkerOtelServiceName))); + } + + /* + * Validate that the OpenTelemetry metrics can be exported to a local Prometheus endpoint running in the same + * process space as the broker/proxy/function-worker. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsPrometheusExport() throws Exception { + var prometheusExporterPort = 9464; + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + + var exporter = "prometheus"; + var prometheusExporterPortProp = + Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort)); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, prometheusExporterPortProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, prometheusExporterPortProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = + getCollectorProps(functionWorkerOtelServiceName, exporter, prometheusExporterPortProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .brokerAdditionalPorts(List.of(prometheusExporterPort)) + .proxyEnvs(proxyCollectorProps) + .proxyAdditionalPorts(List.of(prometheusExporterPort)) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .functionWorkerAdditionalPorts(functionWorkerServiceNameSuffix, List.of(prometheusExporterPort)) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + var workerContainer = pulsarCluster.getAnyWorker(); + + var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getAnyBroker(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getProxy(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(workerContainer, prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", functionWorkerOtelServiceName))); + } + + private static boolean hasMetrics(ChaosContainer<?> container, int port, String metricName, + Pair<String, String> ... expectedLabels) { + var client = new PrometheusMetricsClient(container.getHost(), container.getMappedPort(port)); + var allMetrics = client.getMetrics(); + var actualMetrics = allMetrics.findByNameAndLabels(metricName, expectedLabels); + return !actualMetrics.isEmpty(); + } + + private static Map<String, String> getCollectorProps(String serviceName, String exporter, Review Comment: Why is this named Collector Properties? This is the configuration for OTel Java SDK no? ########## pulsar-otel-metrics-provider/src/main/java/org/apache/pulsar/common/stats/OpenTelemetryService.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.resources.Resource; +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.Singular; +import org.apache.commons.lang3.StringUtils; + +/** + * Provides a common OpenTelemetry service for Pulsar components to use. Responsible for instantiating the OpenTelemetry + * SDK with a set of overrode properties. Once initialized, furnishes access to OpenTelemetry meters. + */ +public class OpenTelemetryService implements Closeable { + + public static final String OTEL_SDK_DISABLED = "otel.sdk.disabled"; + private static final String MAX_CARDINALITY_LIMIT_KEY = "otel.experimental.metrics.cardinality.limit"; + public static final int MAX_CARDINALITY_LIMIT = 10000; + + private final OpenTelemetrySdk openTelemetrySdk; + + @Builder + public OpenTelemetryService(String clusterName, + @Singular Map<String, String> extraProperties, + // Allows customizing the SDK builder; for testing purposes only. + @VisibleForTesting AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) { + checkArgument(StringUtils.isNotEmpty(clusterName), "Cluster name cannot be empty"); + if (sdkBuilder == null) { + sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder(); + } + + Map<String, String> overrideProperties = new HashMap<>(); + overrideProperties.put(OTEL_SDK_DISABLED, "true"); + // Cardinality limit property is exclusive, so we need to add 1. Review Comment: It's not because it's exclusive, it's because it must reserve one attribute set for the overflow, when you reach the max :) ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + + // TODO: Validate cluster name is present once + // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. + var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", functionWorkerOtelServiceName))); + } + + /* + * Validate that the OpenTelemetry metrics can be exported to a local Prometheus endpoint running in the same + * process space as the broker/proxy/function-worker. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsPrometheusExport() throws Exception { + var prometheusExporterPort = 9464; + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + + var exporter = "prometheus"; + var prometheusExporterPortProp = + Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort)); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, prometheusExporterPortProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, prometheusExporterPortProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = + getCollectorProps(functionWorkerOtelServiceName, exporter, prometheusExporterPortProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .brokerAdditionalPorts(List.of(prometheusExporterPort)) + .proxyEnvs(proxyCollectorProps) + .proxyAdditionalPorts(List.of(prometheusExporterPort)) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .functionWorkerAdditionalPorts(functionWorkerServiceNameSuffix, List.of(prometheusExporterPort)) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + var workerContainer = pulsarCluster.getAnyWorker(); + + var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getAnyBroker(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getProxy(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(workerContainer, prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", functionWorkerOtelServiceName))); + } + + private static boolean hasMetrics(ChaosContainer<?> container, int port, String metricName, + Pair<String, String> ... expectedLabels) { + var client = new PrometheusMetricsClient(container.getHost(), container.getMappedPort(port)); + var allMetrics = client.getMetrics(); + var actualMetrics = allMetrics.findByNameAndLabels(metricName, expectedLabels); + return !actualMetrics.isEmpty(); + } + + private static Map<String, String> getCollectorProps(String serviceName, String exporter, + Pair<String, String> ... extraProps) { + var defaultProps = Map.of( + "OTEL_SDK_DISABLED", "false", + "OTEL_METRIC_EXPORT_INTERVAL", "1000", + "OTEL_SERVICE_NAME", serviceName, Review Comment: This is a property that should be set by the service it self (Broker, Proxy, etc). ########## pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/PulsarWorkerOpenTelemetry.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.common.stats.OpenTelemetryService; +import org.apache.pulsar.functions.instance.InstanceConfig; + +public class PulsarWorkerOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarWorkerOpenTelemetry(InstanceConfig instanceConfig) { + openTelemetryService = OpenTelemetryService.builder().clusterName(instanceConfig.getClusterName()).build(); Review Comment: I just understand suddenly that there is one thing I didn't consider doing. Our plan is to recommend that users use OTLP as the exporter since Prometheus memory allocation is out of our hands. Suppose they will use OTLP to send the data to a Collector and from there they will use Prometheus to scrape the metrics from the collector. 1. We need to know how it all works - how does a host is added? This is especially needed when we writ the documentation. Common example would be for people using Prometheus and will use OTLP as the means to send the metrics out of Pulsar. 2. I just noticed we need to specify two known attributes: `service.name` and `service.version`. The name should be "pulsar-broker" / "pulsar-function-worker" / "pulsar-proxy". The version should be the version of pulsar/proxy/function-worker. ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; Review Comment: Why the cluster name is tied up to the service name? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { Review Comment: How about `OpenTelemetrySanityE2ETest` ? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); Review Comment: Do you do this after the cluster has started? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + + // TODO: Validate cluster name is present once + // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. + var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, + metricName, Pair.of("job", functionWorkerOtelServiceName))); + } + + /* + * Validate that the OpenTelemetry metrics can be exported to a local Prometheus endpoint running in the same + * process space as the broker/proxy/function-worker. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsPrometheusExport() throws Exception { + var prometheusExporterPort = 9464; + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + + var exporter = "prometheus"; + var prometheusExporterPortProp = + Pair.of("OTEL_EXPORTER_PROMETHEUS_PORT", Integer.toString(prometheusExporterPort)); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, prometheusExporterPortProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, prometheusExporterPortProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = + getCollectorProps(functionWorkerOtelServiceName, exporter, prometheusExporterPortProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .brokerAdditionalPorts(List.of(prometheusExporterPort)) + .proxyEnvs(proxyCollectorProps) + .proxyAdditionalPorts(List.of(prometheusExporterPort)) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .functionWorkerAdditionalPorts(functionWorkerServiceNameSuffix, List.of(prometheusExporterPort)) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + var workerContainer = pulsarCluster.getAnyWorker(); + + var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getAnyBroker(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", brokerOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(pulsarCluster.getProxy(), prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", proxyOtelServiceName))); + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(workerContainer, prometheusExporterPort, metricName, + Pair.of("pulsar_cluster", clusterName), + Pair.of("service_name", functionWorkerOtelServiceName))); + } + + private static boolean hasMetrics(ChaosContainer<?> container, int port, String metricName, Review Comment: I would separate it into 2 methods: `Metric getMetricsFromPrometheusPort(container, port)` the validation is more fluent by directly using `findByNameAndLabels` ########## pulsar-otel-metrics-provider/pom.xml: ########## @@ -0,0 +1,113 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>3.3.0-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-otel-metrics-provider</artifactId> + <description>OpenTelemetry integration provider</description> + + <dependencies> + <!-- OpenTelemetry dependencies --> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-otlp</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-exporter-prometheus</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk</artifactId> + </dependency> + <dependency> + <groupId>io.opentelemetry</groupId> + <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> + </dependency> + + <dependency> Review Comment: Why is this needed in this module? ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.common.stats.OpenTelemetryService; +import org.apache.pulsar.proxy.server.ProxyService; + +public class PulsarProxyOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarProxyOpenTelemetry(ProxyService proxyService) { + openTelemetryService = + OpenTelemetryService.builder().clusterName(proxyService.getConfiguration().getClusterName()).build(); + meter = openTelemetryService.getMeter("pulsar.proxy"); Review Comment: Same comment about naming: https://github.com/apache/pulsar/blob/master/pip/pip-320.md#meter ########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/PulsarProxyOpenTelemetry.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.stats; + +import io.opentelemetry.api.metrics.Meter; +import java.io.Closeable; +import lombok.Getter; +import org.apache.pulsar.common.stats.OpenTelemetryService; +import org.apache.pulsar.proxy.server.ProxyService; + +public class PulsarProxyOpenTelemetry implements Closeable { + + private final OpenTelemetryService openTelemetryService; + + @Getter + private final Meter meter; + + public PulsarProxyOpenTelemetry(ProxyService proxyService) { Review Comment: We only need the cluster name or configuration, not the entire `ProxyService` ########## tests/integration/pom.xml: ########## @@ -55,6 +55,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> Review Comment: Why is this needed? ########## pulsar-otel-metrics-provider/src/test/java/org/apache/pulsar/common/stats/OpenTelemetryServiceTest.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.stats; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.metrics.internal.state.MetricStorage; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.function.Predicate; +import lombok.Cleanup; +import org.apache.commons.lang3.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryServiceTest { + + private OpenTelemetryService openTelemetryService; + private InMemoryMetricReader reader; + private Meter meter; + + @BeforeMethod + public void setup() throws Exception { + reader = InMemoryMetricReader.create(); + openTelemetryService = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("openTelemetryServiceTestCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + meter = openTelemetryService.getMeter("openTelemetryServiceTestInstrument"); + } + + @AfterMethod + public void teardown() throws Exception { + openTelemetryService.close(); + reader.close(); + } + + // Overrides the default sdkBuilder to include the InMemoryMetricReader for testing purposes. + private static AutoConfiguredOpenTelemetrySdkBuilder getSdkBuilder(MetricReader extraReader) { + return AutoConfiguredOpenTelemetrySdk.builder(). + addMeterProviderCustomizer((sdkMeterProviderBuilder, configProperties) -> { + SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector( + sdkMeterProviderBuilder, extraReader, + // Override the max cardinality limit for this extra reader. + instrumentType -> OpenTelemetryService.MAX_CARDINALITY_LIMIT + 1); + return sdkMeterProviderBuilder; + }); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeNull() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClusterNameCannotBeEmpty() throws Exception { + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder().clusterName(StringUtils.EMPTY).build(); + } + + @Test + public void testIsClusterNameSet() throws Exception { + @Cleanup + InMemoryMetricReader reader = InMemoryMetricReader.create(); + + @Cleanup + OpenTelemetryService ots = OpenTelemetryService.builder(). + sdkBuilder(getSdkBuilder(reader)). + clusterName("testCluster"). + extraProperty(OpenTelemetryService.OTEL_SDK_DISABLED, "false"). + build(); + + Predicate<MetricData> predicate = MetricDataMatcher.builder(). + resourceAttribute(Attributes.of(AttributeKey.stringKey("pulsar.cluster"), "testCluster")). + build(); + + Collection<MetricData> metricData = reader.collectAllMetrics(); + Assert.assertTrue(metricData.stream().anyMatch(predicate)); Review Comment: Don't you prefer assertThat()? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java: ########## @@ -94,6 +94,8 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai private final ProxyContainer proxyContainer; private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap(); private Map<String, Map<String, String>> externalServiceEnvs; + private Map<String, Map<String, String>> functionWorkerEnvs; Review Comment: Why `String -> Map<String, String>` - what does the first String stands for? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import java.time.Duration; +import org.apache.http.HttpStatus; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +public class OpenTelemetryCollectorContainer extends ChaosContainer<OpenTelemetryCollectorContainer> { + + private static final String IMAGE_NAME = "otel/opentelemetry-collector-contrib:latest"; + private static final String NAME = "otel-collector"; + + public static final int PROMETHEUS_EXPORTER_PORT = 8889; + private static final int OTLP_RECEIVER_PORT = 4317; + private static final int ZPAGES_PORT = 55679; + + public OpenTelemetryCollectorContainer(String clusterName) { + super(clusterName, IMAGE_NAME); + } + + @Override + protected void configure() { + super.configure(); + + this.withCopyFileToContainer( + MountableFile.forClasspathResource("containers/otel-collector-config.yaml", 0644), + "/etc/otel-collector-config.yaml") + .withCommand("--config=/etc/otel-collector-config.yaml") + .withExposedPorts(OTLP_RECEIVER_PORT, PROMETHEUS_EXPORTER_PORT, ZPAGES_PORT) + .withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd.withHostName(NAME); Review Comment: What is this for? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/OpenTelemetryCollectorContainer.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import java.time.Duration; +import org.apache.http.HttpStatus; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.utility.MountableFile; + +public class OpenTelemetryCollectorContainer extends ChaosContainer<OpenTelemetryCollectorContainer> { Review Comment: ChaosContainer? Why? ########## tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/MetricsTest.java: ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.metrics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.OpenTelemetryCollectorContainer; +import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +public class MetricsTest { + + /* + * Validate that the OpenTelemetry metrics can be exported to a remote OpenTelemetry collector. + * https://github.com/open-telemetry/opentelemetry-java/blob/main/sdk-extensions/autoconfigure/README.md#prometheus-exporter + */ + @Test(timeOut = 360_000) + public void testOpenTelemetryMetricsOtlpExport() throws Exception { + var clusterName = "testOpenTelemetryMetrics-" + UUID.randomUUID(); + var openTelemetryCollectorContainer = new OpenTelemetryCollectorContainer(clusterName); + + var exporter = "otlp"; + var otlpEndpointProp = + Pair.of("OTEL_EXPORTER_OTLP_ENDPOINT", openTelemetryCollectorContainer.getOtlpEndpoint()); + + var brokerOtelServiceName = clusterName + "-broker"; + var brokerCollectorProps = getCollectorProps(brokerOtelServiceName, exporter, otlpEndpointProp); + + var proxyOtelServiceName = clusterName + "-proxy"; + var proxyCollectorProps = getCollectorProps(proxyOtelServiceName, exporter, otlpEndpointProp); + + var functionWorkerServiceNameSuffix = PulsarTestBase.randomName(); + var functionWorkerOtelServiceName = "function-worker-" + functionWorkerServiceNameSuffix; + var functionWorkerCollectorProps = getCollectorProps(functionWorkerOtelServiceName, exporter, otlpEndpointProp); + + var spec = PulsarClusterSpec.builder() + .clusterName(clusterName) + .brokerEnvs(brokerCollectorProps) + .proxyEnvs(proxyCollectorProps) + .externalService("otel-collector", openTelemetryCollectorContainer) + .functionWorkerEnv(functionWorkerServiceNameSuffix, functionWorkerCollectorProps) + .build(); + @Cleanup("stop") + var pulsarCluster = PulsarCluster.forSpec(spec); + pulsarCluster.start(); + + pulsarCluster.setupFunctionWorkers(functionWorkerServiceNameSuffix, FunctionRuntimeType.PROCESS, 1); + + // TODO: Validate cluster name is present once + // https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. + var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. + Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> + hasMetrics(openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT, Review Comment: It's a bit strange that `hasMetrics()` needs the container and the static port as parameters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
