This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a6cee2b4f33 [feat][broker] PIP-264: Add schema registry metrics 
(#22624)
a6cee2b4f33 is described below

commit a6cee2b4f331a57429dfdbbfbec9777955855edb
Author: Dragos Misca <[email protected]>
AuthorDate: Mon Jun 3 02:20:01 2024 -0700

    [feat][broker] PIP-264: Add schema registry metrics (#22624)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../service/schema/SchemaRegistryService.java      |   6 +-
 .../service/schema/SchemaRegistryServiceImpl.java  |  37 ++--
 .../broker/service/schema/SchemaRegistryStats.java | 198 ++++++++++++++-------
 .../org/apache/pulsar/TestNGInstanceOrder.java     |  38 ++++
 .../broker/service/schema/SchemaServiceTest.java   | 118 +++++++++---
 .../apache/pulsar/client/api/SimpleSchemaTest.java |   3 +
 7 files changed, 291 insertions(+), 111 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 722bfda426d..2e9f9dc6b01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -869,7 +869,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             schemaStorage = createAndStartSchemaStorage();
             schemaRegistryService = SchemaRegistryService.create(
-                    schemaStorage, 
config.getSchemaRegistryCompatibilityCheckers(), this.executor);
+                    schemaStorage, 
config.getSchemaRegistryCompatibilityCheckers(), this);
 
             OffloadPoliciesImpl defaultOffloadPolicies =
                     
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 3c5e3aae7ff..2a2467d3947 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.schema;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -44,13 +44,13 @@ public interface SchemaRegistryService extends 
SchemaRegistry {
     }
 
     static SchemaRegistryService create(SchemaStorage schemaStorage, 
Set<String> schemaRegistryCompatibilityCheckers,
-                                        ScheduledExecutorService scheduler) {
+                                        PulsarService pulsarService) {
         if (schemaStorage != null) {
             try {
                 Map<SchemaType, SchemaCompatibilityCheck> checkers = 
getCheckers(schemaRegistryCompatibilityCheckers);
                 checkers.put(SchemaType.KEY_VALUE, new 
KeyValueSchemaCompatibilityCheck(checkers));
                 return SchemaRegistryServiceWithSchemaDataValidator.of(
-                        new SchemaRegistryServiceImpl(schemaStorage, checkers, 
scheduler));
+                        new SchemaRegistryServiceImpl(schemaStorage, checkers, 
pulsarService));
             } catch (Exception e) {
                 LOG.warn("Unable to create schema registry storage, defaulting 
to empty storage", e);
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 903f57cb780..3e9e13b14fe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 import lombok.extern.slf4j.Slf4j;
@@ -47,6 +46,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
@@ -70,19 +70,19 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
 
     @VisibleForTesting
     SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
-                              Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks, Clock clock,
-                              ScheduledExecutorService scheduler) {
+                              Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks,
+                              Clock clock,
+                              PulsarService pulsarService) {
         this.schemaStorage = schemaStorage;
         this.compatibilityChecks = compatibilityChecks;
         this.clock = clock;
-        this.stats = SchemaRegistryStats.getInstance(scheduler);
+        this.stats = new SchemaRegistryStats(pulsarService);
     }
 
-    @VisibleForTesting
     SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
                               Map<SchemaType, SchemaCompatibilityCheck> 
compatibilityChecks,
-                              ScheduledExecutorService scheduler) {
-        this(schemaStorage, compatibilityChecks, Clock.systemUTC(), scheduler);
+                              PulsarService pulsarService) {
+        this(schemaStorage, compatibilityChecks, Clock.systemUTC(), 
pulsarService);
     }
 
     @Override
@@ -136,16 +136,17 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                     }
                 })
                 .whenComplete((v, t) -> {
+                    var latencyMs = this.clock.millis() - start;
                     if (t != null) {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Get schema failed", schemaId);
                         }
-                        this.stats.recordGetFailed(schemaId);
+                        this.stats.recordGetFailed(schemaId, latencyMs);
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug(null == v ? "[{}] Schema not found" : 
"[{}] Schema is present", schemaId);
                         }
-                        this.stats.recordGetLatency(schemaId, 
this.clock.millis() - start);
+                        this.stats.recordGetLatency(schemaId, latencyMs);
                     }
                 });
     }
@@ -157,10 +158,11 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         return schemaStorage.getAll(schemaId)
                 .thenCompose(schemas -> convertToSchemaAndMetadata(schemaId, 
schemas))
                 .whenComplete((v, t) -> {
+                    var latencyMs = this.clock.millis() - start;
                     if (t != null) {
-                        this.stats.recordGetFailed(schemaId);
+                        this.stats.recordListFailed(schemaId, latencyMs);
                     } else {
-                        this.stats.recordGetLatency(schemaId, 
this.clock.millis() - start);
+                        this.stats.recordListLatency(schemaId, latencyMs);
                     }
                 });
     }
@@ -228,10 +230,11 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                             return 
CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
                         });
                 }))).whenComplete((v, ex) -> {
+                    var latencyMs = this.clock.millis() - start.getValue();
                     if (ex != null) {
                         log.error("[{}] Put schema failed", schemaId, ex);
                         if (start.getValue() != 0) {
-                            this.stats.recordPutFailed(schemaId);
+                            this.stats.recordPutFailed(schemaId, latencyMs);
                         }
                         promise.completeExceptionally(ex);
                     } else {
@@ -261,14 +264,15 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         return schemaStorage
                 .put(schemaId, deletedEntry, new byte[]{})
                 .whenComplete((v, t) -> {
+                    var latencyMs = this.clock.millis() - start;
                     if (t != null) {
                         log.error("[{}] User {} delete schema failed", 
schemaId, user);
-                        this.stats.recordDelFailed(schemaId);
+                        this.stats.recordDelFailed(schemaId, latencyMs);
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] User {} delete schema finished", 
schemaId, user);
                         }
-                        this.stats.recordDelLatency(schemaId, 
this.clock.millis() - start);
+                        this.stats.recordDelLatency(schemaId, latencyMs);
                     }
                 });
     }
@@ -284,11 +288,12 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
 
         return schemaStorage.delete(schemaId, forcefully)
                 .whenComplete((v, t) -> {
+                    var latencyMs = this.clock.millis() - start;
                     if (t != null) {
-                        this.stats.recordDelFailed(schemaId);
+                        this.stats.recordDelFailed(schemaId, latencyMs);
                         log.error("[{}] Delete schema storage failed", 
schemaId);
                     } else {
-                        this.stats.recordDelLatency(schemaId, 
this.clock.millis() - start);
+                        this.stats.recordDelLatency(schemaId, latencyMs);
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Delete schema storage finished", 
schemaId);
                         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
index 32e9e368530..b1a7dc2a541 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
@@ -18,69 +18,111 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-import io.prometheus.client.CollectorRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Summary;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.MetricsUtil;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 
 class SchemaRegistryStats implements AutoCloseable, Runnable {
     private static final String NAMESPACE = "namespace";
     private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999, 
0.9999, 1};
-    private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
 
-    private final Counter getOpsFailedCounter;
-    private final Counter putOpsFailedCounter;
-    private final Counter deleteOpsFailedCounter;
+    public static final AttributeKey<String> REQUEST_TYPE_KEY =
+            AttributeKey.stringKey("pulsar.schema_registry.request");
+    @VisibleForTesting
+    enum RequestType {
+        GET,
+        LIST,
+        PUT,
+        DELETE;
 
-    private final Counter compatibleCounter;
-    private final Counter incompatibleCounter;
-
-    private final Summary deleteOpsLatency;
-    private final Summary getOpsLatency;
-    private final Summary putOpsLatency;
+        public final Attributes attributes = Attributes.of(REQUEST_TYPE_KEY, 
name().toLowerCase());
+    }
 
-    private final Map<String, Long> namespaceAccess = new 
ConcurrentHashMap<>();
-    private ScheduledFuture<?> future;
+    public static final AttributeKey<String> RESPONSE_TYPE_KEY =
+            AttributeKey.stringKey("pulsar.schema_registry.response");
+    @VisibleForTesting
+    enum ResponseType {
+        SUCCESS,
+        FAILURE;
 
-    private static volatile SchemaRegistryStats instance;
+        public final Attributes attributes = Attributes.of(RESPONSE_TYPE_KEY, 
name().toLowerCase());
+    }
 
-    static synchronized SchemaRegistryStats 
getInstance(ScheduledExecutorService scheduler) {
-        if (null == instance) {
-            instance = new SchemaRegistryStats(scheduler);
-        }
+    public static final AttributeKey<String> COMPATIBILITY_CHECK_RESPONSE_KEY =
+            
AttributeKey.stringKey("pulsar.schema_registry.compatibility_check.response");
+    @VisibleForTesting
+    enum CompatibilityCheckResponse {
+        COMPATIBLE,
+        INCOMPATIBLE;
 
-        return instance;
+        public final Attributes attributes = 
Attributes.of(COMPATIBILITY_CHECK_RESPONSE_KEY, name().toLowerCase());
     }
 
-    private SchemaRegistryStats(ScheduledExecutorService scheduler) {
-        this.deleteOpsFailedCounter = 
Counter.build("pulsar_schema_del_ops_failed_total", "-")
-                .labelNames(NAMESPACE).create().register();
-        this.getOpsFailedCounter = 
Counter.build("pulsar_schema_get_ops_failed_total", "-")
-                .labelNames(NAMESPACE).create().register();
-        this.putOpsFailedCounter = 
Counter.build("pulsar_schema_put_ops_failed_total", "-")
-                .labelNames(NAMESPACE).create().register();
+    public static final String SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME =
+            "pulsar.broker.request.schema_registry.duration";
+    private final DoubleHistogram latencyHistogram;
 
-        this.compatibleCounter = 
Counter.build("pulsar_schema_compatible_total", "-")
-                .labelNames(NAMESPACE).create().register();
-        this.incompatibleCounter = 
Counter.build("pulsar_schema_incompatible_total", "-")
-                .labelNames(NAMESPACE).create().register();
+    public static final String COMPATIBLE_COUNTER_METRIC_NAME =
+            
"pulsar.broker.operation.schema_registry.compatibility_check.count";
+    private final LongCounter schemaCompatibilityCounter;
 
-        this.deleteOpsLatency = 
this.buildSummary("pulsar_schema_del_ops_latency", "-");
-        this.getOpsLatency = 
this.buildSummary("pulsar_schema_get_ops_latency", "-");
-        this.putOpsLatency = 
this.buildSummary("pulsar_schema_put_ops_latency", "-");
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Counter getOpsFailedCounter =
+            Counter.build("pulsar_schema_get_ops_failed_total", 
"-").labelNames(NAMESPACE).create().register();
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Counter putOpsFailedCounter =
+            Counter.build("pulsar_schema_put_ops_failed_total", 
"-").labelNames(NAMESPACE).create().register();
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Counter deleteOpsFailedCounter =
+            Counter.build("pulsar_schema_del_ops_failed_total", 
"-").labelNames(NAMESPACE).create().register();
 
-        if (null != scheduler) {
-            this.future = scheduler.scheduleAtFixedRate(this, 1, 1, 
TimeUnit.MINUTES);
-        }
+    @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME)
+    private static  final Counter compatibleCounter =
+            Counter.build("pulsar_schema_compatible_total", 
"-").labelNames(NAMESPACE).create().register();
+    @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME)
+    private static final Counter incompatibleCounter =
+            Counter.build("pulsar_schema_incompatible_total", 
"-").labelNames(NAMESPACE).create().register();
+
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Summary deleteOpsLatency = 
buildSummary("pulsar_schema_del_ops_latency", "-");
+
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Summary getOpsLatency = 
buildSummary("pulsar_schema_get_ops_latency", "-");
+
+    @PulsarDeprecatedMetric(newMetricName = 
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+    private static final Summary putOpsLatency = 
buildSummary("pulsar_schema_put_ops_latency", "-");
+
+    private final Map<String, Long> namespaceAccess = new 
ConcurrentHashMap<>();
+    private final ScheduledFuture<?> future;
+
+    public SchemaRegistryStats(PulsarService pulsarService) {
+        this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1, 
1, TimeUnit.MINUTES);
+
+        var meter = pulsarService.getOpenTelemetry().getMeter();
+        latencyHistogram = 
meter.histogramBuilder(SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+                .setDescription("The duration of Schema Registry requests.")
+                .setUnit("s")
+                .build();
+        schemaCompatibilityCounter = 
meter.counterBuilder(COMPATIBLE_COUNTER_METRIC_NAME)
+                .setDescription("The number of Schema Registry compatibility 
check operations performed by the broker.")
+                .setUnit("{operation}")
+                .build();
     }
 
-    private Summary buildSummary(String name, String help) {
+    private static Summary buildSummary(String name, String help) {
         Summary.Builder builder = Summary.build(name, 
help).labelNames(NAMESPACE);
 
         for (double quantile : QUANTILES) {
@@ -90,38 +132,77 @@ class SchemaRegistryStats implements AutoCloseable, 
Runnable {
         return builder.create().register();
     }
 
-    void recordDelFailed(String schemaId) {
-        this.deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+    void recordDelFailed(String schemaId, long millis) {
+        deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+        recordOperationLatency(schemaId, millis, RequestType.DELETE, 
ResponseType.FAILURE);
+    }
+
+    void recordGetFailed(String schemaId, long millis) {
+        getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+        recordOperationLatency(schemaId, millis, RequestType.GET, 
ResponseType.FAILURE);
     }
 
-    void recordGetFailed(String schemaId) {
-        this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+    void recordListFailed(String schemaId, long millis) {
+        getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+        recordOperationLatency(schemaId, millis, RequestType.LIST, 
ResponseType.FAILURE);
     }
 
-    void recordPutFailed(String schemaId) {
-        this.putOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+    void recordPutFailed(String schemaId, long millis) {
+        putOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+        recordOperationLatency(schemaId, millis, RequestType.PUT, 
ResponseType.FAILURE);
     }
 
     void recordDelLatency(String schemaId, long millis) {
-        this.deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        recordOperationLatency(schemaId, millis, RequestType.DELETE, 
ResponseType.SUCCESS);
     }
 
     void recordGetLatency(String schemaId, long millis) {
-        this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        recordOperationLatency(schemaId, millis, RequestType.GET, 
ResponseType.SUCCESS);
+    }
+
+    void recordListLatency(String schemaId, long millis) {
+        getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        recordOperationLatency(schemaId, millis, RequestType.LIST, 
ResponseType.SUCCESS);
     }
 
     void recordPutLatency(String schemaId, long millis) {
-        this.putOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        putOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+        recordOperationLatency(schemaId, millis, RequestType.PUT, 
ResponseType.SUCCESS);
+    }
+
+    private void recordOperationLatency(String schemaId, long millis,
+                                        RequestType requestType, ResponseType 
responseType) {
+        var duration = MetricsUtil.convertToSeconds(millis, 
TimeUnit.MILLISECONDS);
+        var namespace = getNamespace(schemaId);
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace)
+                .putAll(requestType.attributes)
+                .putAll(responseType.attributes)
+                .build();
+        latencyHistogram.record(duration, attributes);
     }
 
     void recordSchemaIncompatible(String schemaId) {
-        this.incompatibleCounter.labels(getNamespace(schemaId)).inc();
+        var namespace = getNamespace(schemaId);
+        incompatibleCounter.labels(namespace).inc();
+        recordSchemaCompabilityResult(namespace, 
CompatibilityCheckResponse.INCOMPATIBLE);
     }
 
     void recordSchemaCompatible(String schemaId) {
-        this.compatibleCounter.labels(getNamespace(schemaId)).inc();
+        var namespace = getNamespace(schemaId);
+        compatibleCounter.labels(namespace).inc();
+        recordSchemaCompabilityResult(namespace, 
CompatibilityCheckResponse.COMPATIBLE);
     }
 
+    private void recordSchemaCompabilityResult(String namespace, 
CompatibilityCheckResponse result) {
+        var attributes = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace)
+                .putAll(result.attributes)
+                .build();
+        schemaCompatibilityCounter.add(1, attributes);
+    }
 
     private String getNamespace(String schemaId) {
         String namespace;
@@ -148,20 +229,9 @@ class SchemaRegistryStats implements AutoCloseable, 
Runnable {
     }
 
     @Override
-    public void close() throws Exception {
-        if (CLOSED.compareAndSet(false, true)) {
-            
CollectorRegistry.defaultRegistry.unregister(this.deleteOpsFailedCounter);
-            
CollectorRegistry.defaultRegistry.unregister(this.getOpsFailedCounter);
-            
CollectorRegistry.defaultRegistry.unregister(this.putOpsFailedCounter);
-            
CollectorRegistry.defaultRegistry.unregister(this.compatibleCounter);
-            
CollectorRegistry.defaultRegistry.unregister(this.incompatibleCounter);
-            
CollectorRegistry.defaultRegistry.unregister(this.deleteOpsLatency);
-            CollectorRegistry.defaultRegistry.unregister(this.getOpsLatency);
-            CollectorRegistry.defaultRegistry.unregister(this.putOpsLatency);
-            if (null != this.future) {
-                this.future.cancel(false);
-            }
-        }
+    public synchronized void close() throws Exception {
+        namespaceAccess.keySet().forEach(this::removeChild);
+        future.cancel(false);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java
new file mode 100644
index 00000000000..50c9863d586
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import java.util.Comparator;
+import java.util.List;
+import org.testng.IMethodInstance;
+import org.testng.IMethodInterceptor;
+import org.testng.ITestContext;
+
+// Sorts the test methods by test object instance hashcode, then priority, 
then method name. Useful when Factory
+// generated tests interfere with each other.
+public class TestNGInstanceOrder implements IMethodInterceptor {
+    @Override
+    public List<IMethodInstance> intercept(List<IMethodInstance> methods, 
ITestContext context) {
+        return 
methods.stream().sorted(Comparator.<IMethodInstance>comparingInt(o -> 
o.getInstance().hashCode())
+                        .thenComparingInt(o -> 
o.getMethod().getInterceptedPriority())
+                        .thenComparingInt(o -> o.getMethod().getPriority())
+                        .thenComparing(o -> o.getMethod().getMethodName()))
+                .toList();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index fbf8c5cc154..658ea268c64 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,19 +18,23 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD;
+import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertTrue;
 import com.google.common.collect.Multimap;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import io.opentelemetry.api.common.Attributes;
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.time.Clock;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -38,15 +42,17 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -58,6 +64,9 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -66,7 +75,7 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
 
-    private static final Clock MockClock = Clock.fixed(Instant.EPOCH, 
ZoneId.systemDefault());
+    private static final Clock MockClock = Clock.fixed(Instant.now(), 
ZoneId.systemDefault());
 
     private final String schemaId1 = "1/2/3/4";
     private static final String userId = "user";
@@ -99,10 +108,23 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         storage.start();
         Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
         checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
-        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
checkMap, MockClock, null);
+        schemaRegistryService = new SchemaRegistryServiceImpl(storage, 
checkMap, MockClock, pulsar);
+
+        var schemaRegistryStats =
+                Mockito.spy((SchemaRegistryStats) 
FieldUtils.readField(schemaRegistryService, "stats", true));
+        // Disable periodic cleanup of Prometheus entries.
+        Mockito.doNothing().when(schemaRegistryStats).run();
+        FieldUtils.writeField(schemaRegistryService, "stats", 
schemaRegistryStats, true);
+
         setupDefaultTenantAndNamespace();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
@@ -118,6 +140,32 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         getSchema(schemaId, version(0));
         deleteSchema(schemaId, version(1));
 
+        var otelMetrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertThat(otelMetrics).anySatisfy(metric -> assertThat(metric)
+                
.hasName(SchemaRegistryStats.SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+                .hasHistogramSatisfying(histogram -> 
histogram.hasPointsSatisfying(
+                        point -> point
+                                
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"tenant/ns",
+                                        SchemaRegistryStats.REQUEST_TYPE_KEY, 
"delete",
+                                        SchemaRegistryStats.RESPONSE_TYPE_KEY, 
"success"))
+                                .hasCount(1),
+                        point -> point
+                                
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"tenant/ns",
+                                        SchemaRegistryStats.REQUEST_TYPE_KEY, 
"put",
+                                        SchemaRegistryStats.RESPONSE_TYPE_KEY, 
"success"))
+                                .hasCount(1),
+                        point -> point
+                                
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"tenant/ns",
+                                        SchemaRegistryStats.REQUEST_TYPE_KEY, 
"list",
+                                        SchemaRegistryStats.RESPONSE_TYPE_KEY, 
"success"))
+                                .hasCount(1),
+                        point -> point
+                                
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE, 
"tenant/ns",
+                                        SchemaRegistryStats.REQUEST_TYPE_KEY, 
"get",
+                                        SchemaRegistryStats.RESPONSE_TYPE_KEY, 
"success"))
+                                .hasCount(1)
+                )));
+
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsTestUtil.generate(pulsar, false, false, false, 
output);
         output.flush();
@@ -309,16 +357,39 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         putSchema(schemaId1, schemaData2, version(1));
     }
 
-    @Test(expectedExceptions = ExecutionException.class)
+    @Test
     public void checkIsCompatible() throws Exception {
-        putSchema(schemaId1, schemaData1, version(0), 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
-        putSchema(schemaId1, schemaData2, version(1), 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
-
-        assertTrue(schemaRegistryService.isCompatible(schemaId1, schemaData3,
-                SchemaCompatibilityStrategy.BACKWARD).get());
-        assertFalse(schemaRegistryService.isCompatible(schemaId1, schemaData3,
-                SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE).get());
-        putSchema(schemaId1, schemaData3, version(2), 
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+        var schemaId = BrokerTestUtil.newUniqueName("tenant/ns/topic");
+        putSchema(schemaId, schemaData1, version(0), BACKWARD_TRANSITIVE);
+        putSchema(schemaId, schemaData2, version(1), BACKWARD_TRANSITIVE);
+
+        var timeout = Duration.ofSeconds(1);
+        assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, 
BACKWARD))
+                .succeedsWithin(timeout, InstanceOfAssertFactories.BOOLEAN)
+                .isTrue();
+        assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3, 
BACKWARD_TRANSITIVE))
+                .failsWithin(timeout)
+                .withThrowableOfType(ExecutionException.class)
+                .withCauseInstanceOf(IncompatibleSchemaException.class);
+        assertThatThrownBy(() -> putSchema(schemaId, schemaData3, version(2), 
BACKWARD_TRANSITIVE))
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(IncompatibleSchemaException.class);
+
+        
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+                .anySatisfy(metric -> assertThat(metric)
+                        
.hasName(SchemaRegistryStats.COMPATIBLE_COUNTER_METRIC_NAME)
+                        .hasLongSumSatisfying(
+                                sum -> sum.hasPointsSatisfying(
+                                    point -> point
+                                            .hasAttributes(Attributes.of(
+                                                    
OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns",
+                                                    
SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "compatible"))
+                                            .hasValue(2),
+                                    point -> point
+                                            .hasAttributes(Attributes.of(
+                                                    
OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns",
+                                                    
SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "incompatible"))
+                                            .hasValue(2))));
     }
 
     @Test
@@ -374,20 +445,13 @@ public class SchemaServiceTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(expectedVersion, version);
     }
 
-    private SchemaData randomSchema() {
-        UUID randomString = UUID.randomUUID();
-        return SchemaData.builder()
-            .user(userId)
-            .type(SchemaType.JSON)
-            .timestamp(MockClock.millis())
-            .isDeleted(false)
-            .data(randomString.toString().getBytes())
-            .props(new TreeMap<>())
-            .build();
-    }
-
     private static SchemaData getSchemaData(String schemaJson) {
-        return 
SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build();
+        return SchemaData.builder()
+                .data(schemaJson.getBytes())
+                .type(SchemaType.AVRO)
+                .user(userId)
+                .timestamp(MockClock.millis())
+                .build();
     }
 
     private SchemaVersion version(long version) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index c8c7c3b2ccc..e006b72fad2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -41,6 +41,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema.Parser;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.TestNGInstanceOrder;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
@@ -66,10 +67,12 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
+import org.testng.annotations.Listeners;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-api")
 @Slf4j
+@Listeners({ TestNGInstanceOrder.class })
 public class SimpleSchemaTest extends ProducerConsumerBase {
 
     private static final String NAMESPACE = "my-property/my-ns";


Reply via email to