This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a6cee2b4f33 [feat][broker] PIP-264: Add schema registry metrics
(#22624)
a6cee2b4f33 is described below
commit a6cee2b4f331a57429dfdbbfbec9777955855edb
Author: Dragos Misca <[email protected]>
AuthorDate: Mon Jun 3 02:20:01 2024 -0700
[feat][broker] PIP-264: Add schema registry metrics (#22624)
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../service/schema/SchemaRegistryService.java | 6 +-
.../service/schema/SchemaRegistryServiceImpl.java | 37 ++--
.../broker/service/schema/SchemaRegistryStats.java | 198 ++++++++++++++-------
.../org/apache/pulsar/TestNGInstanceOrder.java | 38 ++++
.../broker/service/schema/SchemaServiceTest.java | 118 +++++++++---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 3 +
7 files changed, 291 insertions(+), 111 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 722bfda426d..2e9f9dc6b01 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -869,7 +869,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
schemaStorage = createAndStartSchemaStorage();
schemaRegistryService = SchemaRegistryService.create(
- schemaStorage,
config.getSchemaRegistryCompatibilityCheckers(), this.executor);
+ schemaStorage,
config.getSchemaRegistryCompatibilityCheckers(), this);
OffloadPoliciesImpl defaultOffloadPolicies =
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 3c5e3aae7ff..2a2467d3947 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service.schema;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.schema.SchemaType;
@@ -44,13 +44,13 @@ public interface SchemaRegistryService extends
SchemaRegistry {
}
static SchemaRegistryService create(SchemaStorage schemaStorage,
Set<String> schemaRegistryCompatibilityCheckers,
- ScheduledExecutorService scheduler) {
+ PulsarService pulsarService) {
if (schemaStorage != null) {
try {
Map<SchemaType, SchemaCompatibilityCheck> checkers =
getCheckers(schemaRegistryCompatibilityCheckers);
checkers.put(SchemaType.KEY_VALUE, new
KeyValueSchemaCompatibilityCheck(checkers));
return SchemaRegistryServiceWithSchemaDataValidator.of(
- new SchemaRegistryServiceImpl(schemaStorage, checkers,
scheduler));
+ new SchemaRegistryServiceImpl(schemaStorage, checkers,
pulsarService));
} catch (Exception e) {
LOG.warn("Unable to create schema registry storage, defaulting
to empty storage", e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 903f57cb780..3e9e13b14fe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +46,7 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
@@ -70,19 +70,19 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
@VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
- Map<SchemaType, SchemaCompatibilityCheck>
compatibilityChecks, Clock clock,
- ScheduledExecutorService scheduler) {
+ Map<SchemaType, SchemaCompatibilityCheck>
compatibilityChecks,
+ Clock clock,
+ PulsarService pulsarService) {
this.schemaStorage = schemaStorage;
this.compatibilityChecks = compatibilityChecks;
this.clock = clock;
- this.stats = SchemaRegistryStats.getInstance(scheduler);
+ this.stats = new SchemaRegistryStats(pulsarService);
}
- @VisibleForTesting
SchemaRegistryServiceImpl(SchemaStorage schemaStorage,
Map<SchemaType, SchemaCompatibilityCheck>
compatibilityChecks,
- ScheduledExecutorService scheduler) {
- this(schemaStorage, compatibilityChecks, Clock.systemUTC(), scheduler);
+ PulsarService pulsarService) {
+ this(schemaStorage, compatibilityChecks, Clock.systemUTC(),
pulsarService);
}
@Override
@@ -136,16 +136,17 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
})
.whenComplete((v, t) -> {
+ var latencyMs = this.clock.millis() - start;
if (t != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Get schema failed", schemaId);
}
- this.stats.recordGetFailed(schemaId);
+ this.stats.recordGetFailed(schemaId, latencyMs);
} else {
if (log.isDebugEnabled()) {
log.debug(null == v ? "[{}] Schema not found" :
"[{}] Schema is present", schemaId);
}
- this.stats.recordGetLatency(schemaId,
this.clock.millis() - start);
+ this.stats.recordGetLatency(schemaId, latencyMs);
}
});
}
@@ -157,10 +158,11 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return schemaStorage.getAll(schemaId)
.thenCompose(schemas -> convertToSchemaAndMetadata(schemaId,
schemas))
.whenComplete((v, t) -> {
+ var latencyMs = this.clock.millis() - start;
if (t != null) {
- this.stats.recordGetFailed(schemaId);
+ this.stats.recordListFailed(schemaId, latencyMs);
} else {
- this.stats.recordGetLatency(schemaId,
this.clock.millis() - start);
+ this.stats.recordListLatency(schemaId, latencyMs);
}
});
}
@@ -228,10 +230,11 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return
CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
});
}))).whenComplete((v, ex) -> {
+ var latencyMs = this.clock.millis() - start.getValue();
if (ex != null) {
log.error("[{}] Put schema failed", schemaId, ex);
if (start.getValue() != 0) {
- this.stats.recordPutFailed(schemaId);
+ this.stats.recordPutFailed(schemaId, latencyMs);
}
promise.completeExceptionally(ex);
} else {
@@ -261,14 +264,15 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return schemaStorage
.put(schemaId, deletedEntry, new byte[]{})
.whenComplete((v, t) -> {
+ var latencyMs = this.clock.millis() - start;
if (t != null) {
log.error("[{}] User {} delete schema failed",
schemaId, user);
- this.stats.recordDelFailed(schemaId);
+ this.stats.recordDelFailed(schemaId, latencyMs);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] User {} delete schema finished",
schemaId, user);
}
- this.stats.recordDelLatency(schemaId,
this.clock.millis() - start);
+ this.stats.recordDelLatency(schemaId, latencyMs);
}
});
}
@@ -284,11 +288,12 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
return schemaStorage.delete(schemaId, forcefully)
.whenComplete((v, t) -> {
+ var latencyMs = this.clock.millis() - start;
if (t != null) {
- this.stats.recordDelFailed(schemaId);
+ this.stats.recordDelFailed(schemaId, latencyMs);
log.error("[{}] Delete schema storage failed",
schemaId);
} else {
- this.stats.recordDelLatency(schemaId,
this.clock.millis() - start);
+ this.stats.recordDelLatency(schemaId, latencyMs);
if (log.isDebugEnabled()) {
log.debug("[{}] Delete schema storage finished",
schemaId);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
index 32e9e368530..b1a7dc2a541 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryStats.java
@@ -18,69 +18,111 @@
*/
package org.apache.pulsar.broker.service.schema;
-import io.prometheus.client.CollectorRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.MetricsUtil;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
class SchemaRegistryStats implements AutoCloseable, Runnable {
private static final String NAMESPACE = "namespace";
private static final double[] QUANTILES = {0.50, 0.75, 0.95, 0.99, 0.999,
0.9999, 1};
- private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
- private final Counter getOpsFailedCounter;
- private final Counter putOpsFailedCounter;
- private final Counter deleteOpsFailedCounter;
+ public static final AttributeKey<String> REQUEST_TYPE_KEY =
+ AttributeKey.stringKey("pulsar.schema_registry.request");
+ @VisibleForTesting
+ enum RequestType {
+ GET,
+ LIST,
+ PUT,
+ DELETE;
- private final Counter compatibleCounter;
- private final Counter incompatibleCounter;
-
- private final Summary deleteOpsLatency;
- private final Summary getOpsLatency;
- private final Summary putOpsLatency;
+ public final Attributes attributes = Attributes.of(REQUEST_TYPE_KEY,
name().toLowerCase());
+ }
- private final Map<String, Long> namespaceAccess = new
ConcurrentHashMap<>();
- private ScheduledFuture<?> future;
+ public static final AttributeKey<String> RESPONSE_TYPE_KEY =
+ AttributeKey.stringKey("pulsar.schema_registry.response");
+ @VisibleForTesting
+ enum ResponseType {
+ SUCCESS,
+ FAILURE;
- private static volatile SchemaRegistryStats instance;
+ public final Attributes attributes = Attributes.of(RESPONSE_TYPE_KEY,
name().toLowerCase());
+ }
- static synchronized SchemaRegistryStats
getInstance(ScheduledExecutorService scheduler) {
- if (null == instance) {
- instance = new SchemaRegistryStats(scheduler);
- }
+ public static final AttributeKey<String> COMPATIBILITY_CHECK_RESPONSE_KEY =
+
AttributeKey.stringKey("pulsar.schema_registry.compatibility_check.response");
+ @VisibleForTesting
+ enum CompatibilityCheckResponse {
+ COMPATIBLE,
+ INCOMPATIBLE;
- return instance;
+ public final Attributes attributes =
Attributes.of(COMPATIBILITY_CHECK_RESPONSE_KEY, name().toLowerCase());
}
- private SchemaRegistryStats(ScheduledExecutorService scheduler) {
- this.deleteOpsFailedCounter =
Counter.build("pulsar_schema_del_ops_failed_total", "-")
- .labelNames(NAMESPACE).create().register();
- this.getOpsFailedCounter =
Counter.build("pulsar_schema_get_ops_failed_total", "-")
- .labelNames(NAMESPACE).create().register();
- this.putOpsFailedCounter =
Counter.build("pulsar_schema_put_ops_failed_total", "-")
- .labelNames(NAMESPACE).create().register();
+ public static final String SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME =
+ "pulsar.broker.request.schema_registry.duration";
+ private final DoubleHistogram latencyHistogram;
- this.compatibleCounter =
Counter.build("pulsar_schema_compatible_total", "-")
- .labelNames(NAMESPACE).create().register();
- this.incompatibleCounter =
Counter.build("pulsar_schema_incompatible_total", "-")
- .labelNames(NAMESPACE).create().register();
+ public static final String COMPATIBLE_COUNTER_METRIC_NAME =
+
"pulsar.broker.operation.schema_registry.compatibility_check.count";
+ private final LongCounter schemaCompatibilityCounter;
- this.deleteOpsLatency =
this.buildSummary("pulsar_schema_del_ops_latency", "-");
- this.getOpsLatency =
this.buildSummary("pulsar_schema_get_ops_latency", "-");
- this.putOpsLatency =
this.buildSummary("pulsar_schema_put_ops_latency", "-");
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Counter getOpsFailedCounter =
+ Counter.build("pulsar_schema_get_ops_failed_total",
"-").labelNames(NAMESPACE).create().register();
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Counter putOpsFailedCounter =
+ Counter.build("pulsar_schema_put_ops_failed_total",
"-").labelNames(NAMESPACE).create().register();
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Counter deleteOpsFailedCounter =
+ Counter.build("pulsar_schema_del_ops_failed_total",
"-").labelNames(NAMESPACE).create().register();
- if (null != scheduler) {
- this.future = scheduler.scheduleAtFixedRate(this, 1, 1,
TimeUnit.MINUTES);
- }
+ @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME)
+ private static final Counter compatibleCounter =
+ Counter.build("pulsar_schema_compatible_total",
"-").labelNames(NAMESPACE).create().register();
+ @PulsarDeprecatedMetric(newMetricName = COMPATIBLE_COUNTER_METRIC_NAME)
+ private static final Counter incompatibleCounter =
+ Counter.build("pulsar_schema_incompatible_total",
"-").labelNames(NAMESPACE).create().register();
+
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Summary deleteOpsLatency =
buildSummary("pulsar_schema_del_ops_latency", "-");
+
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Summary getOpsLatency =
buildSummary("pulsar_schema_get_ops_latency", "-");
+
+ @PulsarDeprecatedMetric(newMetricName =
SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ private static final Summary putOpsLatency =
buildSummary("pulsar_schema_put_ops_latency", "-");
+
+ private final Map<String, Long> namespaceAccess = new
ConcurrentHashMap<>();
+ private final ScheduledFuture<?> future;
+
+ public SchemaRegistryStats(PulsarService pulsarService) {
+ this.future = pulsarService.getExecutor().scheduleAtFixedRate(this, 1,
1, TimeUnit.MINUTES);
+
+ var meter = pulsarService.getOpenTelemetry().getMeter();
+ latencyHistogram =
meter.histogramBuilder(SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ .setDescription("The duration of Schema Registry requests.")
+ .setUnit("s")
+ .build();
+ schemaCompatibilityCounter =
meter.counterBuilder(COMPATIBLE_COUNTER_METRIC_NAME)
+ .setDescription("The number of Schema Registry compatibility
check operations performed by the broker.")
+ .setUnit("{operation}")
+ .build();
}
- private Summary buildSummary(String name, String help) {
+ private static Summary buildSummary(String name, String help) {
Summary.Builder builder = Summary.build(name,
help).labelNames(NAMESPACE);
for (double quantile : QUANTILES) {
@@ -90,38 +132,77 @@ class SchemaRegistryStats implements AutoCloseable,
Runnable {
return builder.create().register();
}
- void recordDelFailed(String schemaId) {
- this.deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ void recordDelFailed(String schemaId, long millis) {
+ deleteOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ recordOperationLatency(schemaId, millis, RequestType.DELETE,
ResponseType.FAILURE);
+ }
+
+ void recordGetFailed(String schemaId, long millis) {
+ getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ recordOperationLatency(schemaId, millis, RequestType.GET,
ResponseType.FAILURE);
}
- void recordGetFailed(String schemaId) {
- this.getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ void recordListFailed(String schemaId, long millis) {
+ getOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ recordOperationLatency(schemaId, millis, RequestType.LIST,
ResponseType.FAILURE);
}
- void recordPutFailed(String schemaId) {
- this.putOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ void recordPutFailed(String schemaId, long millis) {
+ putOpsFailedCounter.labels(getNamespace(schemaId)).inc();
+ recordOperationLatency(schemaId, millis, RequestType.PUT,
ResponseType.FAILURE);
}
void recordDelLatency(String schemaId, long millis) {
- this.deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ deleteOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ recordOperationLatency(schemaId, millis, RequestType.DELETE,
ResponseType.SUCCESS);
}
void recordGetLatency(String schemaId, long millis) {
- this.getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ recordOperationLatency(schemaId, millis, RequestType.GET,
ResponseType.SUCCESS);
+ }
+
+ void recordListLatency(String schemaId, long millis) {
+ getOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ recordOperationLatency(schemaId, millis, RequestType.LIST,
ResponseType.SUCCESS);
}
void recordPutLatency(String schemaId, long millis) {
- this.putOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ putOpsLatency.labels(getNamespace(schemaId)).observe(millis);
+ recordOperationLatency(schemaId, millis, RequestType.PUT,
ResponseType.SUCCESS);
+ }
+
+ private void recordOperationLatency(String schemaId, long millis,
+ RequestType requestType, ResponseType
responseType) {
+ var duration = MetricsUtil.convertToSeconds(millis,
TimeUnit.MILLISECONDS);
+ var namespace = getNamespace(schemaId);
+ var attributes = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace)
+ .putAll(requestType.attributes)
+ .putAll(responseType.attributes)
+ .build();
+ latencyHistogram.record(duration, attributes);
}
void recordSchemaIncompatible(String schemaId) {
- this.incompatibleCounter.labels(getNamespace(schemaId)).inc();
+ var namespace = getNamespace(schemaId);
+ incompatibleCounter.labels(namespace).inc();
+ recordSchemaCompabilityResult(namespace,
CompatibilityCheckResponse.INCOMPATIBLE);
}
void recordSchemaCompatible(String schemaId) {
- this.compatibleCounter.labels(getNamespace(schemaId)).inc();
+ var namespace = getNamespace(schemaId);
+ compatibleCounter.labels(namespace).inc();
+ recordSchemaCompabilityResult(namespace,
CompatibilityCheckResponse.COMPATIBLE);
}
+ private void recordSchemaCompabilityResult(String namespace,
CompatibilityCheckResponse result) {
+ var attributes = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, namespace)
+ .putAll(result.attributes)
+ .build();
+ schemaCompatibilityCounter.add(1, attributes);
+ }
private String getNamespace(String schemaId) {
String namespace;
@@ -148,20 +229,9 @@ class SchemaRegistryStats implements AutoCloseable,
Runnable {
}
@Override
- public void close() throws Exception {
- if (CLOSED.compareAndSet(false, true)) {
-
CollectorRegistry.defaultRegistry.unregister(this.deleteOpsFailedCounter);
-
CollectorRegistry.defaultRegistry.unregister(this.getOpsFailedCounter);
-
CollectorRegistry.defaultRegistry.unregister(this.putOpsFailedCounter);
-
CollectorRegistry.defaultRegistry.unregister(this.compatibleCounter);
-
CollectorRegistry.defaultRegistry.unregister(this.incompatibleCounter);
-
CollectorRegistry.defaultRegistry.unregister(this.deleteOpsLatency);
- CollectorRegistry.defaultRegistry.unregister(this.getOpsLatency);
- CollectorRegistry.defaultRegistry.unregister(this.putOpsLatency);
- if (null != this.future) {
- this.future.cancel(false);
- }
- }
+ public synchronized void close() throws Exception {
+ namespaceAccess.keySet().forEach(this::removeChild);
+ future.cancel(false);
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java
b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java
new file mode 100644
index 00000000000..50c9863d586
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/TestNGInstanceOrder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import java.util.Comparator;
+import java.util.List;
+import org.testng.IMethodInstance;
+import org.testng.IMethodInterceptor;
+import org.testng.ITestContext;
+
+// Sorts the test methods by test object instance hashcode, then priority,
then method name. Useful when Factory
+// generated tests interfere with each other.
+public class TestNGInstanceOrder implements IMethodInterceptor {
+ @Override
+ public List<IMethodInstance> intercept(List<IMethodInstance> methods,
ITestContext context) {
+ return
methods.stream().sorted(Comparator.<IMethodInstance>comparingInt(o ->
o.getInstance().hashCode())
+ .thenComparingInt(o ->
o.getMethod().getInterceptedPriority())
+ .thenComparingInt(o -> o.getMethod().getPriority())
+ .thenComparing(o -> o.getMethod().getMethodName()))
+ .toList();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index fbf8c5cc154..658ea268c64 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -18,19 +18,23 @@
*/
package org.apache.pulsar.broker.service.schema;
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD;
+import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertTrue;
import com.google.common.collect.Multimap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
+import io.opentelemetry.api.common.Attributes;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -38,15 +42,17 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -58,6 +64,9 @@ import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -66,7 +75,7 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
- private static final Clock MockClock = Clock.fixed(Instant.EPOCH,
ZoneId.systemDefault());
+ private static final Clock MockClock = Clock.fixed(Instant.now(),
ZoneId.systemDefault());
private final String schemaId1 = "1/2/3/4";
private static final String userId = "user";
@@ -99,10 +108,23 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
storage.start();
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
- schemaRegistryService = new SchemaRegistryServiceImpl(storage,
checkMap, MockClock, null);
+ schemaRegistryService = new SchemaRegistryServiceImpl(storage,
checkMap, MockClock, pulsar);
+
+ var schemaRegistryStats =
+ Mockito.spy((SchemaRegistryStats)
FieldUtils.readField(schemaRegistryService, "stats", true));
+ // Disable periodic cleanup of Prometheus entries.
+ Mockito.doNothing().when(schemaRegistryStats).run();
+ FieldUtils.writeField(schemaRegistryService, "stats",
schemaRegistryStats, true);
+
setupDefaultTenantAndNamespace();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
@@ -118,6 +140,32 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
getSchema(schemaId, version(0));
deleteSchema(schemaId, version(1));
+ var otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertThat(otelMetrics).anySatisfy(metric -> assertThat(metric)
+
.hasName(SchemaRegistryStats.SCHEMA_REGISTRY_REQUEST_DURATION_METRIC_NAME)
+ .hasHistogramSatisfying(histogram ->
histogram.hasPointsSatisfying(
+ point -> point
+
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE,
"tenant/ns",
+ SchemaRegistryStats.REQUEST_TYPE_KEY,
"delete",
+ SchemaRegistryStats.RESPONSE_TYPE_KEY,
"success"))
+ .hasCount(1),
+ point -> point
+
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE,
"tenant/ns",
+ SchemaRegistryStats.REQUEST_TYPE_KEY,
"put",
+ SchemaRegistryStats.RESPONSE_TYPE_KEY,
"success"))
+ .hasCount(1),
+ point -> point
+
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE,
"tenant/ns",
+ SchemaRegistryStats.REQUEST_TYPE_KEY,
"list",
+ SchemaRegistryStats.RESPONSE_TYPE_KEY,
"success"))
+ .hasCount(1),
+ point -> point
+
.hasAttributes(Attributes.of(OpenTelemetryAttributes.PULSAR_NAMESPACE,
"tenant/ns",
+ SchemaRegistryStats.REQUEST_TYPE_KEY,
"get",
+ SchemaRegistryStats.RESPONSE_TYPE_KEY,
"success"))
+ .hasCount(1)
+ )));
+
ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, false, false, false,
output);
output.flush();
@@ -309,16 +357,39 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
putSchema(schemaId1, schemaData2, version(1));
}
- @Test(expectedExceptions = ExecutionException.class)
+ @Test
public void checkIsCompatible() throws Exception {
- putSchema(schemaId1, schemaData1, version(0),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
- putSchema(schemaId1, schemaData2, version(1),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
-
- assertTrue(schemaRegistryService.isCompatible(schemaId1, schemaData3,
- SchemaCompatibilityStrategy.BACKWARD).get());
- assertFalse(schemaRegistryService.isCompatible(schemaId1, schemaData3,
- SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE).get());
- putSchema(schemaId1, schemaData3, version(2),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
+ var schemaId = BrokerTestUtil.newUniqueName("tenant/ns/topic");
+ putSchema(schemaId, schemaData1, version(0), BACKWARD_TRANSITIVE);
+ putSchema(schemaId, schemaData2, version(1), BACKWARD_TRANSITIVE);
+
+ var timeout = Duration.ofSeconds(1);
+ assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3,
BACKWARD))
+ .succeedsWithin(timeout, InstanceOfAssertFactories.BOOLEAN)
+ .isTrue();
+ assertThat(schemaRegistryService.isCompatible(schemaId, schemaData3,
BACKWARD_TRANSITIVE))
+ .failsWithin(timeout)
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(IncompatibleSchemaException.class);
+ assertThatThrownBy(() -> putSchema(schemaId, schemaData3, version(2),
BACKWARD_TRANSITIVE))
+ .isInstanceOf(ExecutionException.class)
+ .hasCauseInstanceOf(IncompatibleSchemaException.class);
+
+
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+
.hasName(SchemaRegistryStats.COMPATIBLE_COUNTER_METRIC_NAME)
+ .hasLongSumSatisfying(
+ sum -> sum.hasPointsSatisfying(
+ point -> point
+ .hasAttributes(Attributes.of(
+
OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns",
+
SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "compatible"))
+ .hasValue(2),
+ point -> point
+ .hasAttributes(Attributes.of(
+
OpenTelemetryAttributes.PULSAR_NAMESPACE, "tenant/ns",
+
SchemaRegistryStats.COMPATIBILITY_CHECK_RESPONSE_KEY, "incompatible"))
+ .hasValue(2))));
}
@Test
@@ -374,20 +445,13 @@ public class SchemaServiceTest extends
MockedPulsarServiceBaseTest {
assertEquals(expectedVersion, version);
}
- private SchemaData randomSchema() {
- UUID randomString = UUID.randomUUID();
- return SchemaData.builder()
- .user(userId)
- .type(SchemaType.JSON)
- .timestamp(MockClock.millis())
- .isDeleted(false)
- .data(randomString.toString().getBytes())
- .props(new TreeMap<>())
- .build();
- }
-
private static SchemaData getSchemaData(String schemaJson) {
- return
SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).user(userId).build();
+ return SchemaData.builder()
+ .data(schemaJson.getBytes())
+ .type(SchemaType.AVRO)
+ .user(userId)
+ .timestamp(MockClock.millis())
+ .build();
}
private SchemaVersion version(long version) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index c8c7c3b2ccc..e006b72fad2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -41,6 +41,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.TestNGInstanceOrder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
@@ -66,10 +67,12 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
+import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
@Test(groups = "broker-api")
@Slf4j
+@Listeners({ TestNGInstanceOrder.class })
public class SimpleSchemaTest extends ProducerConsumerBase {
private static final String NAMESPACE = "my-property/my-ns";