This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2de91e532fd3e7dcee60c1609a71629acd6ae5e1
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri May 30 12:31:17 2025 -0700

    [improve] Enable metrics for all broker caches (#24365)
---
 distribution/shell/src/assemble/LICENSE.bin.txt     |  2 ++
 .../broker/authentication/oidc/JwksCache.java       |  2 ++
 .../oidc/OpenIDProviderMetadataCache.java           |  2 ++
 .../authentication/AuthenticationProviderSasl.java  |  2 ++
 pulsar-broker/pom.xml                               |  5 -----
 .../pulsar/broker/ManagedLedgerClientFactory.java   |  6 ++++++
 .../pulsar/broker/namespace/OwnershipCache.java     |  2 +-
 .../SystemTopicBasedTopicPoliciesService.java       |  3 +++
 .../apache/pulsar/broker/web/PulsarWebResource.java |  5 +++++
 .../common/naming/NamespaceBundleFactory.java       |  2 +-
 .../java/org/apache/pulsar/stats/package-info.java  | 19 -------------------
 pulsar-client-shaded/pom.xml                        | 10 ++++++++++
 pulsar-common/pom.xml                               |  5 +++++
 .../pulsar/common}/stats/CacheMetricsCollector.java |  2 +-
 .../pulsar/functions/instance/ProducerCache.java    |  2 ++
 .../apache/pulsar/metadata/api/MetadataStore.java   | 19 ++++++++++++++++++-
 .../metadata/cache/impl/MetadataCacheImpl.java      | 17 ++++++++++-------
 .../pulsar/metadata/impl/AbstractMetadataStore.java | 21 ++++++++++++++++-----
 .../metadata/impl/FaultInjectionMetadataStore.java  |  3 ++-
 19 files changed, 88 insertions(+), 41 deletions(-)

diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 9554bd85dfc..e77a0f1c459 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -324,6 +324,8 @@ The Apache Software License, Version 2.0
      - jackson-datatype-jdk8-2.17.2.jar
      - jackson-datatype-jsr310-2.17.2.jar
      - jackson-module-parameter-names-2.17.2.jar
+ * Caffeine -- caffeine-2.9.1.jar
+ * simpleclient_caffeine-0.16.0.jar
  * Conscrypt -- conscrypt-openjdk-uber-2.5.2.jar
  * Gson
     - gson-2.8.9.jar
diff --git 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
index c88661c39c6..4358ca736e6 100644
--- 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
+++ 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
@@ -50,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.asynchttpclient.AsyncHttpClient;
 
 public class JwksCache {
@@ -91,6 +92,7 @@ public class JwksCache {
                 .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
                 .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS)
                 .buildAsync(loader);
+        CacheMetricsCollector.CAFFEINE.addCache("oidc-jwks-cache", cache);
     }
 
     CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
diff --git 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
index ebf8168b494..e6dee97f731 100644
--- 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
+++ 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.asynchttpclient.AsyncHttpClient;
 import org.jspecify.annotations.NonNull;
 
@@ -81,6 +82,7 @@ class OpenIDProviderMetadataCache {
                 .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
                 .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS)
                 .buildAsync(loader);
+        CacheMetricsCollector.CAFFEINE.addCache("open-id-provider-metadata", 
cache);
     }
 
     /**
diff --git 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index f8841193ba2..45463f837db 100644
--- 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
 import org.apache.pulsar.common.sasl.SaslConstants;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 
 /**
  * Authentication Provider for SASL (Simple Authentication and Security Layer).
@@ -122,6 +123,7 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
         this.authStates = Caffeine.newBuilder()
                 .maximumSize(config.getMaxInflightSaslContext())
                 .expireAfterWrite(config.getInflightSaslContextExpiryMs(), 
TimeUnit.MILLISECONDS).build();
+        CacheMetricsCollector.CAFFEINE.addCache("auth-sasl-states-cache", 
authStates);
     }
 
     @Override
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 06fa4ea37c4..83b8f56cf4c 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -362,11 +362,6 @@
       <artifactId>simpleclient_hotspot</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>io.prometheus</groupId>
-      <artifactId>simpleclient_caffeine</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>io.swagger</groupId>
       <artifactId>swagger-core</artifactId>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index b060475a43f..6e307378b16 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -45,6 +45,7 @@ import 
org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +60,11 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
             bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
     private StatsProvider statsProvider = new NullStatsProvider();
 
+    public ManagedLedgerClientFactory() {
+        
CacheMetricsCollector.CAFFEINE.addCache("managed-ledger-bk-ensemble-client-cache",
+                bkEnsemblePolicyToBkClientMap);
+    }
+
     @Override
     public void initialize(ServiceConfiguration conf, MetadataStoreExtended 
metadataStore,
                            BookKeeperClientFactory bookkeeperProvider,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index ce68c036a62..ba7f48e64c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -37,10 +37,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
-import org.apache.pulsar.stats.CacheMetricsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4745decf58d..d784f07a6ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.jspecify.annotations.NonNull;
 import org.jspecify.annotations.Nullable;
@@ -131,6 +132,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                             
.createTopicPoliciesSystemTopicClient(namespaceName);
                     return systemTopicClient.newWriterAsync();
                 });
+
+        
CacheMetricsCollector.CAFFEINE.addCache("system-topic-policies-writer-cache", 
writerCaches);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 5be4a170db0..b8efd67647b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -93,6 +93,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
@@ -119,6 +120,10 @@ public abstract class PulsarWebResource {
                         }
                     });
 
+    static {
+        
CacheMetricsCollector.CAFFEINE.addCache("web-resource-service-name-resolver", 
SERVICE_NAME_RESOLVER_CACHE);
+    }
+
     static final String ORIGINAL_PRINCIPAL_HEADER = "X-Original-Principal";
 
     @Context
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 2b285cbb0e2..69f5208ce67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -54,10 +54,10 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
-import org.apache.pulsar.stats.CacheMetricsCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java
deleted file mode 100644
index a7fd90a63ad..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/stats/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.stats;
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index 847a92cc626..4c5bf0c48b4 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -177,6 +177,8 @@
                   <include>org.yaml:snakeyaml</include>
                   
<include>org.apache.pulsar:pulsar-client-dependencies-minimized</include>
                   <include>org.roaringbitmap:RoaringBitmap</include>
+                  <include>io.prometheus:*</include>
+                  <include>com.github.ben-manes.caffeine:*</include>
                 </includes>
                 <excludes>
                   
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
@@ -384,6 +386,14 @@
                   <pattern>org.yaml</pattern>
                   
<shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.github.benmanes</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.com.github.benmanes</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io.prometheus.client</pattern>
+                  
<shadedPattern>org.apache.pulsar.shade.io.prometheus.client</shadedPattern>
+                </relocation>
               </relocations>
               <transformers>
                 <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 3fa2fbe7697..e414a3a886d 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -82,6 +82,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient_caffeine</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.jspecify</groupId>
       <artifactId>jspecify</artifactId>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java
similarity index 96%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java
index ebf58562e57..903c7172be5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/stats/CacheMetricsCollector.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/CacheMetricsCollector.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.stats;
+package org.apache.pulsar.common.stats;
 
 import lombok.experimental.UtilityClass;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
index 2e10581b352..17feaef13fe 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
@@ -107,6 +108,7 @@ public class ProducerCache implements Closeable {
             
builder.expireAfterAccess(Duration.ofSeconds(PRODUCER_CACHE_TIMEOUT_SECONDS));
         }
         cache = builder.build();
+        CacheMetricsCollector.CAFFEINE.addCache("function-producer-cache", 
cache);
     }
 
     public <T> Producer<T> getOrCreateProducer(CacheArea cacheArea, String 
topicName, Object additionalCacheKey,
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 89b0e7a6fe1..f0ec8f52375 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -219,9 +219,13 @@ public interface MetadataStore extends AutoCloseable {
      *            the custom serialization/deserialization object
      * @param cacheConfig
      *          the cache configuration to be used
+     * @deprecated use {@link #getMetadataCache(String, MetadataSerde, 
MetadataCacheConfig)}
      * @return the metadata cache object
      */
-    <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig);
+    @Deprecated
+    default <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
+        return getMetadataCache("serde", serde, cacheConfig);
+    }
 
     /**
      * Create a metadata cache that uses a particular serde object.
@@ -235,6 +239,19 @@ public interface MetadataStore extends AutoCloseable {
         return getMetadataCache(serde, getDefaultMetadataCacheConfig());
     }
 
+    /**
+     * Create a metadata cache that uses a particular serde object.
+     *
+     * @param <T>
+     * @param serde
+     *            the custom serialization/deserialization object
+     * @return the metadata cache object
+     */
+    default <T> MetadataCache<T> getMetadataCache(String cacheName, 
MetadataSerde<T> serde,
+                                                  MetadataCacheConfig 
cacheConfig) {
+        return getMetadataCache(serde, cacheConfig);
+    }
+
     /**
      * Returns the default metadata cache config.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 8607354f15c..f11e3dc1980 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,6 +39,7 @@ import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.GetResult;
@@ -66,18 +67,18 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> 
objCache;
 
-    public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef, 
MetadataCacheConfig<T> cacheConfig,
-                             ScheduledExecutorService executor) {
-        this(store, new JSONMetadataSerdeTypeRef<>(typeRef), cacheConfig, 
executor);
+    public MetadataCacheImpl(String cacheName, MetadataStore store, 
TypeReference<T> typeRef,
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
+        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor);
     }
 
-    public MetadataCacheImpl(MetadataStore store, JavaType type, 
MetadataCacheConfig<T> cacheConfig,
+    public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType 
type, MetadataCacheConfig<T> cacheConfig,
                              ScheduledExecutorService executor) {
-        this(store, new JSONMetadataSerdeSimpleType<>(type), cacheConfig, 
executor);
+        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor);
     }
 
-    public MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde, 
MetadataCacheConfig<T> cacheConfig,
-                             ScheduledExecutorService executor) {
+    public MetadataCacheImpl(String cacheName, MetadataStore store, 
MetadataSerde<T> serde,
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
         this.store = store;
         if (store instanceof MetadataStoreExtended) {
             this.storeExtended = (MetadataStoreExtended) store;
@@ -121,6 +122,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                         }
                     }
                 });
+
+        CacheMetricsCollector.CAFFEINE.addCache(cacheName, objCache);
     }
 
     private CompletableFuture<Optional<CacheGetResult<T>>> 
readValueFromStore(String path) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 8e201617a4b..cee1fc549ad 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata.impl;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
@@ -48,6 +49,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -115,6 +117,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     }
                 });
+        CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + 
"-children", childrenCache);
 
         this.existsCache = Caffeine.newBuilder()
                 .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
@@ -136,6 +139,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                         }
                     }
                 });
+        CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + "-exists", 
childrenCache);
 
         this.metadataStoreName = metadataStoreName;
         this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, 
openTelemetry);
@@ -235,22 +239,29 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, 
MetadataCacheConfig cacheConfig) {
-        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
-                TypeFactory.defaultInstance().constructSimpleType(clazz, 
null), cacheConfig, this.executor);
+        JavaType typeRef = 
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
+        String cacheName = String.format("%s-%s", metadataStoreName, 
typeRef.getTypeName());
+        MetadataCacheImpl<T> metadataCache =
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
 
     @Override
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
-        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, 
typeRef, cacheConfig, this.executor);
+        String cacheName = String.format("%s-%s", metadataStoreName, 
typeRef.getType().getTypeName());
+        MetadataCacheImpl<T> metadataCache =
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
-        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<>(this, 
serde, cacheConfig, this.executor);
+    public <T> MetadataCache<T> getMetadataCache(String cacheName, 
MetadataSerde<T> serde,
+                                                 MetadataCacheConfig 
cacheConfig) {
+        MetadataCacheImpl<T> metadataCache =
+                new MetadataCacheImpl<>(String.format("%s-%s", 
metadataStoreName, cacheName), this, serde, cacheConfig,
+                        this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 360b8c91d0b..91cd3754d69 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -161,7 +161,8 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public <T> MetadataCache<T> getMetadataCache(MetadataSerde<T> serde, 
MetadataCacheConfig cacheConfig) {
+    public <T> MetadataCache<T> getMetadataCache(String cacheName, 
MetadataSerde<T> serde,
+                                                 MetadataCacheConfig 
cacheConfig) {
         return 
injectMetadataStoreInMetadataCache(store.getMetadataCache(serde, cacheConfig));
     }
 

Reply via email to