This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2de91e532fd3e7dcee60c1609a71629acd6ae5e1 Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri May 30 12:31:17 2025 -0700 [improve] Enable metrics for all broker caches (#24365) --- distribution/shell/src/assemble/LICENSE.bin.txt | 2 ++ .../broker/authentication/oidc/JwksCache.java | 2 ++ .../oidc/OpenIDProviderMetadataCache.java | 2 ++ .../authentication/AuthenticationProviderSasl.java | 2 ++ pulsar-broker/pom.xml | 5 ----- .../pulsar/broker/ManagedLedgerClientFactory.java | 6 ++++++ .../pulsar/broker/namespace/OwnershipCache.java | 2 +- .../SystemTopicBasedTopicPoliciesService.java | 3 +++ .../apache/pulsar/broker/web/PulsarWebResource.java | 5 +++++ .../common/naming/NamespaceBundleFactory.java | 2 +- .../java/org/apache/pulsar/stats/package-info.java | 19 ------------------- pulsar-client-shaded/pom.xml | 10 ++++++++++ pulsar-common/pom.xml | 5 +++++ .../pulsar/common}/stats/CacheMetricsCollector.java | 2 +- .../pulsar/functions/instance/ProducerCache.java | 2 ++ .../apache/pulsar/metadata/api/MetadataStore.java | 19 ++++++++++++++++++- .../metadata/cache/impl/MetadataCacheImpl.java | 17 ++++++++++------- .../pulsar/metadata/impl/AbstractMetadataStore.java | 21 ++++++++++++++++----- .../metadata/impl/FaultInjectionMetadataStore.java | 3 ++- 19 files changed, 88 insertions(+), 41 deletions(-) diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 9554bd85dfc..e77a0f1c459 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -324,6 +324,8 @@ The Apache Software License, Version 2.0 - jackson-datatype-jdk8-2.17.2.jar - jackson-datatype-jsr310-2.17.2.jar - jackson-module-parameter-names-2.17.2.jar + * Caffeine -- caffeine-2.9.1.jar + * simpleclient_caffeine-0.16.0.jar * Conscrypt -- conscrypt-openjdk-uber-2.5.2.jar * Gson - gson-2.8.9.jar diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java index c88661c39c6..4358ca736e6 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java @@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.asynchttpclient.AsyncHttpClient; public class JwksCache { @@ -91,6 +92,7 @@ public class JwksCache { .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS) .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS) .buildAsync(loader); + CacheMetricsCollector.CAFFEINE.addCache("oidc-jwks-cache", cache); } CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) { diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java index ebf8168b494..e6dee97f731 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java @@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.asynchttpclient.AsyncHttpClient; import org.jspecify.annotations.NonNull; @@ -81,6 +82,7 @@ class OpenIDProviderMetadataCache { .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS) .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS) .buildAsync(loader); + CacheMetricsCollector.CAFFEINE.addCache("open-id-provider-metadata", cache); } /** diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java index f8841193ba2..45463f837db 100644 --- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java +++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java @@ -57,6 +57,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.sasl.JAASCredentialsContainer; import org.apache.pulsar.common.sasl.SaslConstants; +import org.apache.pulsar.common.stats.CacheMetricsCollector; /** * Authentication Provider for SASL (Simple Authentication and Security Layer). @@ -122,6 +123,7 @@ public class AuthenticationProviderSasl implements AuthenticationProvider { this.authStates = Caffeine.newBuilder() .maximumSize(config.getMaxInflightSaslContext()) .expireAfterWrite(config.getInflightSaslContextExpiryMs(), TimeUnit.MILLISECONDS).build(); + CacheMetricsCollector.CAFFEINE.addCache("auth-sasl-states-cache", authStates); } @Override diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 06fa4ea37c4..83b8f56cf4c 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -362,11 +362,6 @@ <artifactId>simpleclient_hotspot</artifactId> </dependency> - <dependency> - <groupId>io.prometheus</groupId> - <artifactId>simpleclient_caffeine</artifactId> - </dependency> - <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-core</artifactId> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b060475a43f..6e307378b16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -45,6 +45,7 @@ import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +60,11 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync(); private StatsProvider statsProvider = new NullStatsProvider(); + public ManagedLedgerClientFactory() { + CacheMetricsCollector.CAFFEINE.addCache("managed-ledger-bk-ensemble-client-cache", + bkEnsemblePolicyToBkClientMap); + } + @Override public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, BookKeeperClientFactory bookkeeperProvider, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java index ce68c036a62..ba7f48e64c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java @@ -37,10 +37,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundles; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.coordination.LockManager; import org.apache.pulsar.metadata.api.coordination.ResourceLock; -import org.apache.pulsar.stats.CacheMetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4745decf58d..d784f07a6ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -131,6 +132,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic .createTopicPoliciesSystemTopicClient(namespaceName); return systemTopicClient.newWriterAsync(); }); + + CacheMetricsCollector.CAFFEINE.addCache("system-topic-policies-writer-cache", writerCaches); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 5be4a170db0..b8efd67647b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -93,6 +93,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.path.PolicyPath; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LockManager; @@ -119,6 +120,10 @@ public abstract class PulsarWebResource { } }); + static { + CacheMetricsCollector.CAFFEINE.addCache("web-resource-service-name-resolver", SERVICE_NAME_RESOLVER_CACHE); + } + static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal"; @Context diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 2b285cbb0e2..69f5208ce67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -54,10 +54,10 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.policies.data.loadbalancer.BundleData; -import org.apache.pulsar.stats.CacheMetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java deleted file mode 100644 index a7fd90a63ad..00000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.stats; diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 847a92cc626..4c5bf0c48b4 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -177,6 +177,8 @@ <include>org.yaml:snakeyaml</include> <include>org.apache.pulsar:pulsar-client-dependencies-minimized</include> <include>org.roaringbitmap:RoaringBitmap</include> + <include>io.prometheus:*</include> + <include>com.github.ben-manes.caffeine:*</include> </includes> <excludes> <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> @@ -384,6 +386,14 @@ <pattern>org.yaml</pattern> <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern> </relocation> + <relocation> + <pattern>com.github.benmanes</pattern> + <shadedPattern>org.apache.pulsar.shade.com.github.benmanes</shadedPattern> + </relocation> + <relocation> + <pattern>io.prometheus.client</pattern> + <shadedPattern>org.apache.pulsar.shade.io.prometheus.client</shadedPattern> + </relocation> </relocations> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index 3fa2fbe7697..e414a3a886d 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -82,6 +82,11 @@ <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient_caffeine</artifactId> + </dependency> + <dependency> <groupId>org.jspecify</groupId> <artifactId>jspecify</artifactId> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java similarity index 96% rename from pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java index ebf58562e57..903c7172be5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.stats; +package org.apache.pulsar.common.stats; import lombok.experimental.UtilityClass; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java index 2e10581b352..17feaef13fe 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; @Slf4j @@ -107,6 +108,7 @@ public class ProducerCache implements Closeable { builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS)); } cache = builder.build(); + CacheMetricsCollector.CAFFEINE.addCache("function-producer-cache", cache); } public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String topicName, Object additionalCacheKey, diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 89b0e7a6fe1..f0ec8f52375 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -219,9 +219,13 @@ public interface MetadataStore extends AutoCloseable { * the custom serialization/deserialization object * @param cacheConfig * the cache configuration to be used + * @deprecated use {@link #getMetadataCache(String, MetadataSerde, MetadataCacheConfig)} * @return the metadata cache object */ - <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig); + @Deprecated + default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) { + return getMetadataCache("serde", serde, cacheConfig); + } /** * Create a metadata cache that uses a particular serde object. @@ -235,6 +239,19 @@ public interface MetadataStore extends AutoCloseable { return getMetadataCache(serde, getDefaultMetadataCacheConfig()); } + /** + * Create a metadata cache that uses a particular serde object. + * + * @param <T> + * @param serde + * the custom serialization/deserialization object + * @return the metadata cache object + */ + default <T> MetadataCache<T> getMetadataCache(String cacheName, MetadataSerde<T> serde, + MetadataCacheConfig cacheConfig) { + return getMetadataCache(serde, cacheConfig); + } + /** * Returns the default metadata cache config. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 8607354f15c..f11e3dc1980 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -39,6 +39,7 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.GetResult; @@ -66,18 +67,18 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> objCache; - public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, MetadataCacheConfig<T> cacheConfig, - ScheduledExecutorService executor) { - this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); + public MetadataCacheImpl(String cacheName, MetadataStore store, TypeReference<T> typeRef, + MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { + this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, executor); } - public MetadataCacheImpl(MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig, + public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType type, MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { - this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); + this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, executor); } - public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, MetadataCacheConfig<T> cacheConfig, - ScheduledExecutorService executor) { + public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde<T> serde, + MetadataCacheConfig<T> cacheConfig, ScheduledExecutorService executor) { this.store = store; if (store instanceof MetadataStoreExtended) { this.storeExtended = (MetadataStoreExtended) store; @@ -121,6 +122,8 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica } } }); + + CacheMetricsCollector.CAFFEINE.addCache(cacheName, objCache); } private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String path) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 8e201617a4b..cee1fc549ad 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -19,6 +19,7 @@ package org.apache.pulsar.metadata.impl; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.type.TypeFactory; import com.github.benmanes.caffeine.cache.AsyncCacheLoader; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.stats.CacheMetricsCollector; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -115,6 +117,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co } } }); + CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + "-children", childrenCache); this.existsCache = Caffeine.newBuilder() .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) @@ -136,6 +139,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co } } }); + CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + "-exists", childrenCache); this.metadataStoreName = metadataStoreName; this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, openTelemetry); @@ -235,22 +239,29 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co @Override public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig cacheConfig) { - MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, - TypeFactory.defaultInstance().constructSimpleType(clazz, null), cacheConfig, this.executor); + JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); + String cacheName = String.format("%s-%s", metadataStoreName, typeRef.getTypeName()); + MetadataCacheImpl<T> metadataCache = + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); metadataCaches.add(metadataCache); return metadataCache; } @Override public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) { - MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, typeRef, cacheConfig, this.executor); + String cacheName = String.format("%s-%s", metadataStoreName, typeRef.getType().getTypeName()); + MetadataCacheImpl<T> metadataCache = + new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor); metadataCaches.add(metadataCache); return metadataCache; } @Override - public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) { - MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, serde, cacheConfig, this.executor); + public <T> MetadataCache<T> getMetadataCache(String cacheName, MetadataSerde<T> serde, + MetadataCacheConfig cacheConfig) { + MetadataCacheImpl<T> metadataCache = + new MetadataCacheImpl<>(String.format("%s-%s", metadataStoreName, cacheName), this, serde, cacheConfig, + this.executor); metadataCaches.add(metadataCache); return metadataCache; } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index 360b8c91d0b..91cd3754d69 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -161,7 +161,8 @@ public class FaultInjectionMetadataStore implements MetadataStoreExtended { } @Override - public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, MetadataCacheConfig cacheConfig) { + public <T> MetadataCache<T> getMetadataCache(String cacheName, MetadataSerde<T> serde, + MetadataCacheConfig cacheConfig) { return injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig)); }